Skip to content

Commit

Permalink
fix: include path of field that causes JSON deserialization error (#4249
Browse files Browse the repository at this point in the history
)

Fixes: #4238

For complex types, the current deserialization error can be a bit lacking, as it does not include details of which field causes the error.  This commit enhances the JSON deserializer to include the JSON path of where the error occurred in the error message.

The design keep the happy path quick by using the call stack to maintain the stack of fields, rather than tracking them in some separate stack in memory. Figured this was worth while for such a performance sensitive area of the code.  There's no detectable perf hit from this change.

I've also increased the times on the `SerdeBenchmark`, as from my own experience 10 seconds is not enough time for the JVM to optimise the byte code.

Perf test on JSON deserialization before changes:

```
# JMH version: 1.21
# VM version: JDK 1.8.0_162, Java HotSpot(TM) 64-Bit Server VM, 25.162-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/jre/bin/java
# VM options: -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=58499:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8
# Warmup: 6 iterations, 30 s each
# Measurement: 3 iterations, 60 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: io.confluent.ksql.benchmark.SerdeBenchmark.deserialize
# Parameters: (schemaName = metrics, serializationFormat = JSON)

# Run progress: 0.00% complete, ETA 00:06:00
# Fork: 1 of 1
# Warmup Iteration   1: 5.188 us/op
# Warmup Iteration   2: 5.124 us/op
# Warmup Iteration   3: 5.193 us/op
# Warmup Iteration   4: 5.529 us/op
# Warmup Iteration   5: 5.378 us/op
# Warmup Iteration   6: 5.266 us/op
Iteration   1: 5.209 us/op
Iteration   2: 5.183 us/op
Iteration   3: 5.274 us/op

Result "io.confluent.ksql.benchmark.SerdeBenchmark.deserialize":
  5.222 ±(99.9%) 0.854 us/op [Average]
  (min, avg, max) = (5.183, 5.222, 5.274), stdev = 0.047
  CI (99.9%): [4.368, 6.075] (assumes normal distribution)

# Run complete. Total time: 00:06:01

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.222 ± 0.854  us/op
 ```

After:

```
# JMH version: 1.21
# VM version: JDK 1.8.0_162, Java HotSpot(TM) 64-Bit Server VM, 25.162-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/jre/bin/java
# VM options: -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=60943:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8
# Warmup: 6 iterations, 30 s each
# Measurement: 3 iterations, 60 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: io.confluent.ksql.benchmark.SerdeBenchmark.deserialize
# Parameters: (schemaName = metrics, serializationFormat = JSON)

# Run progress: 0.00% complete, ETA 00:06:00
# Fork: 1 of 1
# Warmup Iteration   1: 5.284 us/op
# Warmup Iteration   2: 5.114 us/op
# Warmup Iteration   3: 5.185 us/op
# Warmup Iteration   4: 5.105 us/op
# Warmup Iteration   5: 5.171 us/op
# Warmup Iteration   6: 5.119 us/op
Iteration   1: 5.208 us/op
Iteration   2: 5.248 us/op
Iteration   3: 5.198 us/op

Result "io.confluent.ksql.benchmark.SerdeBenchmark.deserialize":
  5.218 ±(99.9%) 0.477 us/op [Average]
  (min, avg, max) = (5.198, 5.218, 5.248), stdev = 0.026
  CI (99.9%): [4.740, 5.695] (assumes normal distribution)

# Run complete. Total time: 00:06:02

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                   (schemaName)  (serializationFormat)  Mode  Cnt  Score   Error  Units
SerdeBenchmark.deserialize       metrics                   JSON  avgt    3  5.218 ± 0.477  us/op

```
  • Loading branch information
big-andy-coates authored Jan 10, 2020
1 parent 1281ab2 commit 5cc718b
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.CommandLineOptions;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

Expand All @@ -68,8 +68,8 @@
*/
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 3, time = 10)
@Warmup(iterations = 6, time = 30)
@Measurement(iterations = 3, time = 60)
@Threads(4)
@Fork(3)
public class SerdeBenchmark {
Expand Down Expand Up @@ -192,20 +192,26 @@ private static Serde<GenericRow> getGenericRowSerde(
}
}

@SuppressWarnings("MethodMayBeStatic") // Tests can not be static
@Benchmark
public byte[] serialize(final SerdeState serdeState) {
return serdeState.serializer.serialize(TOPIC_NAME, serdeState.row);
}

@SuppressWarnings("MethodMayBeStatic") // Tests can not be static
@Benchmark
public GenericRow deserialize(final SerdeState serdeState) {
return serdeState.deserializer.deserialize(TOPIC_NAME, serdeState.bytes);
}

public static void main(final String[] args) throws RunnerException {
final Options opt = new OptionsBuilder()
.include(SerdeBenchmark.class.getSimpleName())
.build();
public static void main(final String[] args) throws Exception {

final Options opt = args.length != 0
? new CommandLineOptions(args)
: new OptionsBuilder()
.include(SerdeBenchmark.class.getSimpleName())
.shouldFailOnError(true)
.build();

new Runner(opt).run();
}
Expand Down
52 changes: 52 additions & 0 deletions ksql-benchmark/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Copyright 2019 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

# Disable INFO logs from Config classes, which log out their config on each creation:
log4j.logger.io.confluent.connect.avro.AvroConverterConfig=WARN
log4j.logger.io.confluent.connect.avro.AvroDataConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaJsonDeserializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaJsonSerializerConfig=WARN
log4j.logger.io.confluent.ksql.logging.processing.ProcessingLogConfig=WARN
log4j.logger.io.confluent.ksql.rest.server.KsqlRestConfig=WARN
log4j.logger.io.confluent.ksql.util.KsqlConfig=WARN
log4j.logger.io.confluent.ksql.cli.console.CliConfig=WARN
log4j.logger.kafka.server.KafkaConfig=WARN
log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=WARN
log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=WARN
log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=WARN
log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN
log4j.logger.org.apache.kafka.streams.StreamsConfig=WARN

# Disable INFO logging from the UDF loader, which logs every UDF ever time it runs:
log4j.logger.io.confluent.ksql.function.UdfLoader=WARN

# Disable logging of state transitions in KS:
log4j.logger.org.apache.kafka.streams.KafkaStreams=WARN
log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN
log4j.logger.org.apache.kafka.streams.state.internals.RocksDBTimestampedStore=WARN

# Disable logging of App info
log4j.logger.org.apache.kafka.common.utils.AppInfoParser=WARN

# Disable logging from reflections warning for connect classpath scans
log4j.logger.org.reflections=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -46,12 +44,12 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
@SuppressWarnings("UnstableApiUsage")
public class KsqlJsonDeserializer implements Deserializer<Object> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

Expand All @@ -60,6 +58,9 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
private static final ObjectMapper MAPPER = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);

private static final Schema STRING_ARRAY = SchemaBuilder
.array(Schema.OPTIONAL_STRING_SCHEMA).build();

private static final Map<Schema.Type, Function<JsonValueContext, Object>> HANDLERS = ImmutableMap
.<Schema.Type, Function<JsonValueContext, Object>>builder()
.put(Type.BOOLEAN, context -> JsonSerdeUtils.toBoolean(context.val))
Expand All @@ -74,6 +75,7 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
.build();

private final PersistenceSchema physicalSchema;
private String target = "?";

public KsqlJsonDeserializer(
final PersistenceSchema physicalSchema
Expand All @@ -82,55 +84,53 @@ public KsqlJsonDeserializer(
}

@Override
public void configure(final Map<String, ?> map, final boolean b) {
public void configure(final Map<String, ?> map, final boolean isKey) {
this.target = isKey ? "key" : "value";
}

@Override
public Object deserialize(final String topic, final byte[] bytes) {
try {
final Object value = deserialize(bytes);
final JsonNode value = bytes == null
? null
: MAPPER.readTree(bytes);

final Object coerced = enforceFieldType(
"$",
new JsonValueContext(value, physicalSchema.serializedSchema())
);

if (LOG.isTraceEnabled()) {
LOG.trace("Deserialized value. topic:{}, row:{}", topic, value);
LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced);
}
return value;

return coerced;
} catch (final Exception e) {
throw new SerializationException(
"Error deserializing JSON message from topic: " + topic, e);
}
}

private Object deserialize(final byte[] bytes) {
try {
if (bytes == null) {
return null;
}

final JsonNode value = MAPPER.readTree(bytes);
return enforceFieldType(this, physicalSchema.serializedSchema(), value);
} catch (final IOException e) {
throw new SerializationException(e);
"mvn " + target + " from topic: " + topic, e);
}
}

private static Object enforceFieldType(
final KsqlJsonDeserializer deserializer,
final Schema schema,
final JsonNode columnVal
final String pathPart,
final JsonValueContext context
) {
return enforceFieldType(new JsonValueContext(deserializer, schema, columnVal));
}

private static Object enforceFieldType(final JsonValueContext context) {
if (context.val == null || context.val instanceof NullNode) {
return null;
}

final Function<JsonValueContext, Object> handler = HANDLERS.getOrDefault(
context.schema.type(),
type -> {
throw new KsqlException("Type is not supported: " + type);
});
return handler.apply(context);
try {
final Function<JsonValueContext, Object> handler = HANDLERS.getOrDefault(
context.schema.type(),
type -> {
throw new KsqlException("Type is not supported: " + type);
});
return handler.apply(context);
} catch (final CoercionException e) {
throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e);
} catch (final Exception e) {
throw new CoercionException(e.getMessage(), pathPart, e);
}
}

private static String processString(final JsonValueContext context) {
Expand All @@ -142,10 +142,8 @@ private static String processString(final JsonValueContext context) {
}
}
if (context.val instanceof ArrayNode) {
return Streams.stream(context.val.elements())
.map(val -> processString(
new JsonValueContext(context.deserializer, context.schema, val)
))
return enforceElementTypeForArray(new JsonValueContext(context.val, STRING_ARRAY)).stream()
.map(Objects::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
return context.val.asText();
Expand All @@ -171,10 +169,16 @@ private static List<?> enforceElementTypeForArray(final JsonValueContext context
throw invalidConversionException(context.val, context.schema);
}

int idx = 0;
final ArrayNode list = (ArrayNode) context.val;
final List<Object> array = new ArrayList<>(list.size());
for (final JsonNode item : list) {
array.add(enforceFieldType(context.deserializer, context.schema.valueSchema(), item));
final Object element = enforceFieldType(
"[" + idx++ + "]",
new JsonValueContext(item, context.schema.valueSchema())
);

array.add(element);
}
return array;
}
Expand All @@ -188,15 +192,18 @@ private static Map<String, Object> enforceKeyAndValueTypeForMap(final JsonValueC
final Map<String, Object> ksqlMap = new HashMap<>(map.size());
for (final Iterator<Entry<String, JsonNode>> it = map.fields(); it.hasNext(); ) {
final Entry<String, JsonNode> e = it.next();
ksqlMap.put(
enforceFieldType(
context.deserializer,
Schema.OPTIONAL_STRING_SCHEMA,
new TextNode(e.getKey()))
.toString(),
enforceFieldType(
context.deserializer, context.schema.valueSchema(), e.getValue())

final String key = (String) enforceFieldType(
"." + e.getKey() + ".key",
new JsonValueContext(new TextNode(e.getKey()), Schema.OPTIONAL_STRING_SCHEMA)
);

final Object value = enforceFieldType(
"." + e.getKey() + ".value",
new JsonValueContext(e.getValue(), context.schema.valueSchema())
);

ksqlMap.put(key, value);
}
return ksqlMap;
}
Expand All @@ -222,9 +229,8 @@ private static Struct enforceFieldTypesForStruct(final JsonValueContext context)
}

final Object coerced = enforceFieldType(
context.deserializer,
ksqlField.schema(),
fieldValue
"." + ksqlField.name(),
new JsonValueContext(fieldValue, ksqlField.schema())
);

columnStruct.put(ksqlField.name(), coerced);
Expand Down Expand Up @@ -257,20 +263,37 @@ private static IllegalArgumentException invalidConversionException(
);
}

private static class JsonValueContext {
private static final class JsonValueContext {

private final KsqlJsonDeserializer deserializer;
private final Schema schema;
private final JsonNode val;

JsonValueContext(
final KsqlJsonDeserializer deserializer,
final Schema schema,
final JsonNode val
final JsonNode val,
final Schema schema
) {
this.deserializer = Objects.requireNonNull(deserializer);
this.schema = Objects.requireNonNull(schema, "schema");
this.val = val;
}
}

private static final class CoercionException extends RuntimeException {

private final String path;
private final String message;

CoercionException(final String message, final String path, final Throwable cause) {
super(message + ", path: " + path, cause);
this.message = Objects.requireNonNull(message, "message");
this.path = Objects.requireNonNull(path, "path");
}

public String getRawMessage() {
return message;
}

public String getPath() {
return path;
}
}
}
Loading

0 comments on commit 5cc718b

Please sign in to comment.