Skip to content

Commit

Permalink
[SPARK-24169][SQL] JsonToStructs should not access SQLConf at executo…
Browse files Browse the repository at this point in the history
…r side

## What changes were proposed in this pull request?

This PR is extracted from apache#21190 , to make it easier to backport.

`JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body.

## How was this patch tested?

tested in apache#21190

Author: Wenchen Fan <[email protected]>

Closes apache#21226 from cloud-fan/minor4.
  • Loading branch information
cloud-fan committed May 3, 2018
1 parent 991b526 commit 96a5001
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ object FunctionRegistry {
// Otherwise, find a constructor method that matches the number of arguments, and use that.
val params = Seq.fill(expressions.size)(classOf[Expression])
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
val validParametersCount = constructors.map(_.getParameterCount).distinct.sorted
val validParametersCount = constructors
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
.map(_.getParameterCount).distinct.sorted
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
validParametersCount.head.toString
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,10 @@ case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String],
forceNullableSchema: Boolean)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {

val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)

// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
Expand All @@ -532,14 +531,21 @@ case class JsonToStructs(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = Map.empty[String, String],
child = child,
timeZoneId = None)
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

def this(child: Expression, schema: Expression, options: Expression) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = JsonExprUtils.convertToMapData(options),
child = child,
timeZoneId = None)
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

// Used in `org.apache.spark.sql.functions`
def this(schema: DataType, options: Map[String, String], child: Expression) =
this(schema, options, child, timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | ArrayType(_: StructType, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
InternalRow(1)
)
}
Expand All @@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
null
)

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId, true),
null
)
}
Expand All @@ -416,70 +416,70 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val input = """[{"a": 1}, {"a": 2}]"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: InternalRow(2) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=object, schema=array, output=array of single row") {
val input = """{"a": 1}"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty array, schema=array, output=empty array") {
val input = "[ ]"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty object, schema=array, output=array of single row with null") {
val input = "{ }"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(null) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=array, schema=struct, output=null") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty array, schema=struct, output=null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json - input=empty object, schema=struct, output=single row with null") {
val input = """{ }"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
}

test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId, true),
null
)
}

test("SPARK-20549: from_json bad UTF-8") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
null)
}

Expand All @@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
c.set(2016, 0, 1, 0, 0, 0)
c.set(Calendar.MILLISECOND, 123)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
InternalRow(c.getTimeInMillis * 1000L)
)
// The result doesn't change because the json string includes timezone string ("Z" here),
// which means the string represents the timestamp string in the timezone regardless of
// the timeZoneId parameter.
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), true),
InternalRow(c.getTimeInMillis * 1000L)
)

Expand All @@ -512,7 +512,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
schema,
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
Literal(jsonData2),
Option(tz.getID)),
Option(tz.getID),
true),
InternalRow(c.getTimeInMillis * 1000L)
)
checkEvaluation(
Expand All @@ -521,7 +522,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
Literal(jsonData2),
gmtId),
gmtId,
true),
InternalRow(c.getTimeInMillis * 1000L)
)
}
Expand All @@ -530,7 +532,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, true),
null
)
}
Expand Down Expand Up @@ -685,27 +687,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with

test("from_json missing fields") {
for (forceJsonNullableSchema <- Seq(false, true)) {
withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = false)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
checkEvaluation(
JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId),
output
)
val schema = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
.dataType
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
assert(schemaToCompare == schema)
}
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = false)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
val expr = JsonToStructs(
jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, forceJsonNullableSchema)
checkEvaluation(expr, output)
val schema = expr.dataType
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
assert(schemaToCompare == schema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3179,7 +3179,7 @@ object functions {
* @since 2.2.0
*/
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
JsonToStructs(schema, options, e.expr)
new JsonToStructs(schema, options, e.expr)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ select to_json()
struct<>
-- !query 12 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function to_json. Expected: one of 1, 2 and 3; Found: 0; line 1 pos 7
Invalid number of arguments for function to_json. Expected: one of 1 and 2; Found: 0; line 1 pos 7


-- !query 13
Expand Down Expand Up @@ -225,7 +225,7 @@ select from_json()
struct<>
-- !query 21 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function from_json. Expected: one of 2, 3 and 4; Found: 0; line 1 pos 7
Invalid number of arguments for function from_json. Expected: one of 2 and 3; Found: 0; line 1 pos 7


-- !query 22
Expand Down

0 comments on commit 96a5001

Please sign in to comment.