From 4e1bd785bc300a4e892c8e477ad4c3b63abd73a4 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 8 Jan 2020 15:40:55 +0000 Subject: [PATCH] fix: include path of field that causes JSON deserialization error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes: https://github.com/confluentinc/ksql/issues/4238 For complex types, the current deserialization error can be a bit lacking, as it does not include details of which field causes the error. This commit enhances the JSON deserializer to include the JSON path of where the error occurred in the error message. The design keep the happy path quick by using the call stack to maintain the stack of fields, rather than tracking them in some separate stack in memory. Figured this was worth while for such a performance sensitive area of the code. There's no detectable perf hit from this change. I've also increased the times on the `SerdeBenchmark`, as from my own experience 10 seconds is not enough time for the JVM to optimise the byte code. Perf test on JSON deserialization before changes: ``` # JMH version: 1.21 # VM version: JDK 1.8.0_162, Java HotSpot(TM) 64-Bit Server VM, 25.162-b12 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/jre/bin/java # VM options: -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=58499:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 # Warmup: 6 iterations, 30 s each # Measurement: 3 iterations, 60 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Average time, time/op # Benchmark: io.confluent.ksql.benchmark.SerdeBenchmark.deserialize # Parameters: (schemaName = metrics, serializationFormat = JSON) # Run progress: 0.00% complete, ETA 00:06:00 # Fork: 1 of 1 # Warmup Iteration 1: 5.188 us/op # Warmup Iteration 2: 5.124 us/op # Warmup Iteration 3: 5.193 us/op # Warmup Iteration 4: 5.529 us/op # Warmup Iteration 5: 5.378 us/op # Warmup Iteration 6: 5.266 us/op Iteration 1: 5.209 us/op Iteration 2: 5.183 us/op Iteration 3: 5.274 us/op Result "io.confluent.ksql.benchmark.SerdeBenchmark.deserialize": 5.222 ±(99.9%) 0.854 us/op [Average] (min, avg, max) = (5.183, 5.222, 5.274), stdev = 0.047 CI (99.9%): [4.368, 6.075] (assumes normal distribution) # Run complete. Total time: 00:06:01 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark (schemaName) (serializationFormat) Mode Cnt Score Error Units SerdeBenchmark.deserialize metrics JSON avgt 3 5.222 ± 0.854 us/op ``` After: ``` # JMH version: 1.21 # VM version: JDK 1.8.0_162, Java HotSpot(TM) 64-Bit Server VM, 25.162-b12 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/jre/bin/java # VM options: -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=60943:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 # Warmup: 6 iterations, 30 s each # Measurement: 3 iterations, 60 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Average time, time/op # Benchmark: io.confluent.ksql.benchmark.SerdeBenchmark.deserialize # Parameters: (schemaName = metrics, serializationFormat = JSON) # Run progress: 0.00% complete, ETA 00:06:00 # Fork: 1 of 1 # Warmup Iteration 1: 5.284 us/op # Warmup Iteration 2: 5.114 us/op # Warmup Iteration 3: 5.185 us/op # Warmup Iteration 4: 5.105 us/op # Warmup Iteration 5: 5.171 us/op # Warmup Iteration 6: 5.119 us/op Iteration 1: 5.208 us/op Iteration 2: 5.248 us/op Iteration 3: 5.198 us/op Result "io.confluent.ksql.benchmark.SerdeBenchmark.deserialize": 5.218 ±(99.9%) 0.477 us/op [Average] (min, avg, max) = (5.198, 5.218, 5.248), stdev = 0.026 CI (99.9%): [4.740, 5.695] (assumes normal distribution) # Run complete. Total time: 00:06:02 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark (schemaName) (serializationFormat) Mode Cnt Score Error Units SerdeBenchmark.deserialize metrics JSON avgt 3 5.218 ± 0.477 us/op ``` --- .../ksql/benchmark/SerdeBenchmark.java | 20 ++- .../src/main/resources/log4j.properties | 52 +++++++ .../ksql/serde/json/KsqlJsonDeserializer.java | 135 ++++++++++-------- .../serde/json/KsqlJsonDeserializerTest.java | 98 +++++++++++-- 4 files changed, 230 insertions(+), 75 deletions(-) create mode 100644 ksql-benchmark/src/main/resources/log4j.properties diff --git a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java index 7dabb9ba9a21..3d8b1e7f39d3 100644 --- a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java +++ b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java @@ -57,7 +57,7 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.CommandLineOptions; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -68,8 +68,8 @@ */ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Warmup(iterations = 3, time = 10) -@Measurement(iterations = 3, time = 10) +@Warmup(iterations = 6, time = 30) +@Measurement(iterations = 3, time = 60) @Threads(4) @Fork(3) public class SerdeBenchmark { @@ -192,20 +192,26 @@ private static Serde getGenericRowSerde( } } + @SuppressWarnings("MethodMayBeStatic") // Tests can not be static @Benchmark public byte[] serialize(final SerdeState serdeState) { return serdeState.serializer.serialize(TOPIC_NAME, serdeState.row); } + @SuppressWarnings("MethodMayBeStatic") // Tests can not be static @Benchmark public GenericRow deserialize(final SerdeState serdeState) { return serdeState.deserializer.deserialize(TOPIC_NAME, serdeState.bytes); } - public static void main(final String[] args) throws RunnerException { - final Options opt = new OptionsBuilder() - .include(SerdeBenchmark.class.getSimpleName()) - .build(); + public static void main(final String[] args) throws Exception { + + final Options opt = args.length != 0 + ? new CommandLineOptions(args) + : new OptionsBuilder() + .include(SerdeBenchmark.class.getSimpleName()) + .shouldFailOnError(true) + .build(); new Runner(opt).run(); } diff --git a/ksql-benchmark/src/main/resources/log4j.properties b/ksql-benchmark/src/main/resources/log4j.properties new file mode 100644 index 000000000000..75bd597e7b9d --- /dev/null +++ b/ksql-benchmark/src/main/resources/log4j.properties @@ -0,0 +1,52 @@ +# +# Copyright 2019 Confluent Inc. +# +# Licensed under the Confluent Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# http://www.confluent.io/confluent-community-license +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +log4j.rootLogger=INFO,stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +# Disable INFO logs from Config classes, which log out their config on each creation: +log4j.logger.io.confluent.connect.avro.AvroConverterConfig=WARN +log4j.logger.io.confluent.connect.avro.AvroDataConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaJsonDeserializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaJsonSerializerConfig=WARN +log4j.logger.io.confluent.ksql.logging.processing.ProcessingLogConfig=WARN +log4j.logger.io.confluent.ksql.rest.server.KsqlRestConfig=WARN +log4j.logger.io.confluent.ksql.util.KsqlConfig=WARN +log4j.logger.io.confluent.ksql.cli.console.CliConfig=WARN +log4j.logger.kafka.server.KafkaConfig=WARN +log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=WARN +log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=WARN +log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=WARN +log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN +log4j.logger.org.apache.kafka.streams.StreamsConfig=WARN + +# Disable INFO logging from the UDF loader, which logs every UDF ever time it runs: +log4j.logger.io.confluent.ksql.function.UdfLoader=WARN + +# Disable logging of state transitions in KS: +log4j.logger.org.apache.kafka.streams.KafkaStreams=WARN +log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN +log4j.logger.org.apache.kafka.streams.state.internals.RocksDBTimestampedStore=WARN + +# Disable logging of App info +log4j.logger.org.apache.kafka.common.utils.AppInfoParser=WARN + +# Disable logging from reflections warning for connect classpath scans +log4j.logger.org.reflections=ERROR diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java index 39a62b458c2b..bdebce895ce8 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java @@ -25,12 +25,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Streams; import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; -import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.HashMap; @@ -46,12 +44,12 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling -@SuppressWarnings("UnstableApiUsage") public class KsqlJsonDeserializer implements Deserializer { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling @@ -60,6 +58,9 @@ public class KsqlJsonDeserializer implements Deserializer { private static final ObjectMapper MAPPER = new ObjectMapper() .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); + private static final Schema STRING_ARRAY = SchemaBuilder + .array(Schema.OPTIONAL_STRING_SCHEMA).build(); + private static final Map> HANDLERS = ImmutableMap .>builder() .put(Type.BOOLEAN, context -> JsonSerdeUtils.toBoolean(context.val)) @@ -74,6 +75,7 @@ public class KsqlJsonDeserializer implements Deserializer { .build(); private final PersistenceSchema physicalSchema; + private String target = "?"; public KsqlJsonDeserializer( final PersistenceSchema physicalSchema @@ -82,55 +84,53 @@ public KsqlJsonDeserializer( } @Override - public void configure(final Map map, final boolean b) { + public void configure(final Map map, final boolean isKey) { + this.target = isKey ? "key" : "value"; } @Override public Object deserialize(final String topic, final byte[] bytes) { try { - final Object value = deserialize(bytes); + final JsonNode value = bytes == null + ? null + : MAPPER.readTree(bytes); + + final Object coerced = enforceFieldType( + "$", + new JsonValueContext(value, physicalSchema.serializedSchema()) + ); + if (LOG.isTraceEnabled()) { - LOG.trace("Deserialized value. topic:{}, row:{}", topic, value); + LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced); } - return value; + + return coerced; } catch (final Exception e) { throw new SerializationException( - "Error deserializing JSON message from topic: " + topic, e); - } - } - - private Object deserialize(final byte[] bytes) { - try { - if (bytes == null) { - return null; - } - - final JsonNode value = MAPPER.readTree(bytes); - return enforceFieldType(this, physicalSchema.serializedSchema(), value); - } catch (final IOException e) { - throw new SerializationException(e); + "mvn " + target + " from topic: " + topic, e); } } private static Object enforceFieldType( - final KsqlJsonDeserializer deserializer, - final Schema schema, - final JsonNode columnVal + final String pathPart, + final JsonValueContext context ) { - return enforceFieldType(new JsonValueContext(deserializer, schema, columnVal)); - } - - private static Object enforceFieldType(final JsonValueContext context) { if (context.val == null || context.val instanceof NullNode) { return null; } - final Function handler = HANDLERS.getOrDefault( - context.schema.type(), - type -> { - throw new KsqlException("Type is not supported: " + type); - }); - return handler.apply(context); + try { + final Function handler = HANDLERS.getOrDefault( + context.schema.type(), + type -> { + throw new KsqlException("Type is not supported: " + type); + }); + return handler.apply(context); + } catch (final CoercionException e) { + throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e); + } catch (final Exception e) { + throw new CoercionException(e.getMessage(), pathPart, e); + } } private static String processString(final JsonValueContext context) { @@ -142,10 +142,8 @@ private static String processString(final JsonValueContext context) { } } if (context.val instanceof ArrayNode) { - return Streams.stream(context.val.elements()) - .map(val -> processString( - new JsonValueContext(context.deserializer, context.schema, val) - )) + return enforceElementTypeForArray(new JsonValueContext(context.val, STRING_ARRAY)).stream() + .map(Objects::toString) .collect(Collectors.joining(", ", "[", "]")); } return context.val.asText(); @@ -171,10 +169,16 @@ private static List enforceElementTypeForArray(final JsonValueContext context throw invalidConversionException(context.val, context.schema); } + int idx = 0; final ArrayNode list = (ArrayNode) context.val; final List array = new ArrayList<>(list.size()); for (final JsonNode item : list) { - array.add(enforceFieldType(context.deserializer, context.schema.valueSchema(), item)); + final Object element = enforceFieldType( + "[" + idx++ + "]", + new JsonValueContext(item, context.schema.valueSchema()) + ); + + array.add(element); } return array; } @@ -188,15 +192,18 @@ private static Map enforceKeyAndValueTypeForMap(final JsonValueC final Map ksqlMap = new HashMap<>(map.size()); for (final Iterator> it = map.fields(); it.hasNext(); ) { final Entry e = it.next(); - ksqlMap.put( - enforceFieldType( - context.deserializer, - Schema.OPTIONAL_STRING_SCHEMA, - new TextNode(e.getKey())) - .toString(), - enforceFieldType( - context.deserializer, context.schema.valueSchema(), e.getValue()) + + final String key = (String) enforceFieldType( + "." + e.getKey() + ".key", + new JsonValueContext(new TextNode(e.getKey()), Schema.OPTIONAL_STRING_SCHEMA) ); + + final Object value = enforceFieldType( + "." + e.getKey() + ".value", + new JsonValueContext(e.getValue(), context.schema.valueSchema()) + ); + + ksqlMap.put(key, value); } return ksqlMap; } @@ -222,9 +229,8 @@ private static Struct enforceFieldTypesForStruct(final JsonValueContext context) } final Object coerced = enforceFieldType( - context.deserializer, - ksqlField.schema(), - fieldValue + "." + ksqlField.name(), + new JsonValueContext(fieldValue, ksqlField.schema()) ); columnStruct.put(ksqlField.name(), coerced); @@ -257,20 +263,37 @@ private static IllegalArgumentException invalidConversionException( ); } - private static class JsonValueContext { + private static final class JsonValueContext { - private final KsqlJsonDeserializer deserializer; private final Schema schema; private final JsonNode val; JsonValueContext( - final KsqlJsonDeserializer deserializer, - final Schema schema, - final JsonNode val + final JsonNode val, + final Schema schema ) { - this.deserializer = Objects.requireNonNull(deserializer); this.schema = Objects.requireNonNull(schema, "schema"); this.val = val; } } + + private static final class CoercionException extends RuntimeException { + + private final String path; + private final String message; + + CoercionException(final String message, final String path, final Throwable cause) { + super(message + ", path: " + path, cause); + this.message = Objects.requireNonNull(message, "message"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String getRawMessage() { + return message; + } + + public String getPath() { + return path; + } + } } \ No newline at end of file diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java index adf3ad624d56..3813fb76fd54 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java @@ -17,9 +17,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.fail; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; @@ -179,7 +181,7 @@ public void shouldThrowIfFieldCanNotBeCoerced() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: BIGINT"))); // When: @@ -226,13 +228,17 @@ public void shouldDeserializeNullAsNull() { @Test public void shouldTreatNullAsNull() { // Given: + final HashMap mapValue = new HashMap<>(); + mapValue.put("a", 1.0); + mapValue.put("b", null); + final Map row = new HashMap<>(); row.put("ordertime", null); row.put("@orderid", null); row.put("itemid", null); row.put("orderunits", null); row.put("arrayCol", new Double[]{0.0, null}); - row.put("mapCol", null); + row.put("mapCol", mapValue); final byte[] bytes = serializeJson(row); @@ -246,7 +252,7 @@ public void shouldTreatNullAsNull() { .put(ITEMID, null) .put(ORDERUNITS, null) .put(ARRAYCOL, Arrays.asList(0.0, null)) - .put(MAPCOL, null) + .put(MAPCOL, mapValue) .put(CASE_SENSITIVE_FIELD, null) )); } @@ -303,7 +309,7 @@ public void shouldThrowIfCanNotCoerceToBoolean() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: IntNode, requiredType: BOOLEAN"))); // When: @@ -342,7 +348,7 @@ public void shouldThrowIfCanNotCoerceToInt() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER"))); // When: @@ -382,7 +388,7 @@ public void shouldThrowIfCanNotCoerceToBigInt() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: BIGINT"))); // When: @@ -422,7 +428,7 @@ public void shouldThrowIfCanNotCoerceToDouble() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: DOUBLE"))); // When: @@ -487,7 +493,7 @@ public void shouldThrowIfCanNotCoerceToBigDecimal() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: DECIMAL(20, 19)"))); // When: @@ -523,7 +529,7 @@ public void shouldThrowIfNotAnArray() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: ARRAY"))); // When: @@ -544,7 +550,7 @@ public void shouldThrowIfCanNotCoerceArrayElement() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't coerce string to type. targetType: INTEGER"))); // When: @@ -580,7 +586,7 @@ public void shouldThrowIfNotAnMap() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: MAP"))); // When: @@ -599,7 +605,7 @@ public void shouldThrowIfCanNotCoerceMapValue() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER"))); // When: @@ -686,6 +692,74 @@ public void shouldNotIncludeBadValueInExceptionAsThatWouldBeASecurityIssue() { } } + @Test + public void shouldIncludePathForErrorsInRootNode() { + // Given: + givenDeserializerForSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); + + final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + + // Then: + expectedException.expectCause(hasMessage(endsWith(", path: $"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInObjectFieldsValue() { + // Given: + final Map value = new HashMap<>(AN_ORDER); + value.put("ordertime", true); + + final byte[] bytes = serializeJson(value); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith(", path: $.ORDERTIME"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInArrayElements() { + // Given: + givenDeserializerForSchema(SchemaBuilder + .array(Schema.OPTIONAL_INT32_SCHEMA) + .build() + ); + + final List expected = ImmutableList.of(0, "not", "numbers"); + + final byte[] bytes = serializeJson(expected); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith("path: $[1]"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInMapValues() { + // Given: + givenDeserializerForSchema(SchemaBuilder + .map(Schema.OPTIONAL_STRING_SCHEMA, Schema.INT32_SCHEMA) + .build() + ); + + final byte[] bytes = serializeJson(ImmutableMap.of("a", 1, "b", true)); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith("path: $.b.value"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + private void givenDeserializerForSchema(final Schema serializedSchema) { final boolean unwrap = serializedSchema.type() != Type.STRUCT; final Schema ksqlSchema = unwrap