diff --git a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java index 7dabb9ba9a21..3d8b1e7f39d3 100644 --- a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java +++ b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java @@ -57,7 +57,7 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.CommandLineOptions; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -68,8 +68,8 @@ */ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Warmup(iterations = 3, time = 10) -@Measurement(iterations = 3, time = 10) +@Warmup(iterations = 6, time = 30) +@Measurement(iterations = 3, time = 60) @Threads(4) @Fork(3) public class SerdeBenchmark { @@ -192,20 +192,26 @@ private static Serde getGenericRowSerde( } } + @SuppressWarnings("MethodMayBeStatic") // Tests can not be static @Benchmark public byte[] serialize(final SerdeState serdeState) { return serdeState.serializer.serialize(TOPIC_NAME, serdeState.row); } + @SuppressWarnings("MethodMayBeStatic") // Tests can not be static @Benchmark public GenericRow deserialize(final SerdeState serdeState) { return serdeState.deserializer.deserialize(TOPIC_NAME, serdeState.bytes); } - public static void main(final String[] args) throws RunnerException { - final Options opt = new OptionsBuilder() - .include(SerdeBenchmark.class.getSimpleName()) - .build(); + public static void main(final String[] args) throws Exception { + + final Options opt = args.length != 0 + ? new CommandLineOptions(args) + : new OptionsBuilder() + .include(SerdeBenchmark.class.getSimpleName()) + .shouldFailOnError(true) + .build(); new Runner(opt).run(); } diff --git a/ksql-benchmark/src/main/resources/log4j.properties b/ksql-benchmark/src/main/resources/log4j.properties new file mode 100644 index 000000000000..75bd597e7b9d --- /dev/null +++ b/ksql-benchmark/src/main/resources/log4j.properties @@ -0,0 +1,52 @@ +# +# Copyright 2019 Confluent Inc. +# +# Licensed under the Confluent Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# http://www.confluent.io/confluent-community-license +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +log4j.rootLogger=INFO,stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +# Disable INFO logs from Config classes, which log out their config on each creation: +log4j.logger.io.confluent.connect.avro.AvroConverterConfig=WARN +log4j.logger.io.confluent.connect.avro.AvroDataConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaJsonDeserializerConfig=WARN +log4j.logger.io.confluent.kafka.serializers.KafkaJsonSerializerConfig=WARN +log4j.logger.io.confluent.ksql.logging.processing.ProcessingLogConfig=WARN +log4j.logger.io.confluent.ksql.rest.server.KsqlRestConfig=WARN +log4j.logger.io.confluent.ksql.util.KsqlConfig=WARN +log4j.logger.io.confluent.ksql.cli.console.CliConfig=WARN +log4j.logger.kafka.server.KafkaConfig=WARN +log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=WARN +log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=WARN +log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=WARN +log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN +log4j.logger.org.apache.kafka.streams.StreamsConfig=WARN + +# Disable INFO logging from the UDF loader, which logs every UDF ever time it runs: +log4j.logger.io.confluent.ksql.function.UdfLoader=WARN + +# Disable logging of state transitions in KS: +log4j.logger.org.apache.kafka.streams.KafkaStreams=WARN +log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN +log4j.logger.org.apache.kafka.streams.state.internals.RocksDBTimestampedStore=WARN + +# Disable logging of App info +log4j.logger.org.apache.kafka.common.utils.AppInfoParser=WARN + +# Disable logging from reflections warning for connect classpath scans +log4j.logger.org.reflections=ERROR diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java index 39a62b458c2b..bdebce895ce8 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java @@ -25,12 +25,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Streams; import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlException; -import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.HashMap; @@ -46,12 +44,12 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling -@SuppressWarnings("UnstableApiUsage") public class KsqlJsonDeserializer implements Deserializer { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling @@ -60,6 +58,9 @@ public class KsqlJsonDeserializer implements Deserializer { private static final ObjectMapper MAPPER = new ObjectMapper() .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); + private static final Schema STRING_ARRAY = SchemaBuilder + .array(Schema.OPTIONAL_STRING_SCHEMA).build(); + private static final Map> HANDLERS = ImmutableMap .>builder() .put(Type.BOOLEAN, context -> JsonSerdeUtils.toBoolean(context.val)) @@ -74,6 +75,7 @@ public class KsqlJsonDeserializer implements Deserializer { .build(); private final PersistenceSchema physicalSchema; + private String target = "?"; public KsqlJsonDeserializer( final PersistenceSchema physicalSchema @@ -82,55 +84,53 @@ public KsqlJsonDeserializer( } @Override - public void configure(final Map map, final boolean b) { + public void configure(final Map map, final boolean isKey) { + this.target = isKey ? "key" : "value"; } @Override public Object deserialize(final String topic, final byte[] bytes) { try { - final Object value = deserialize(bytes); + final JsonNode value = bytes == null + ? null + : MAPPER.readTree(bytes); + + final Object coerced = enforceFieldType( + "$", + new JsonValueContext(value, physicalSchema.serializedSchema()) + ); + if (LOG.isTraceEnabled()) { - LOG.trace("Deserialized value. topic:{}, row:{}", topic, value); + LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced); } - return value; + + return coerced; } catch (final Exception e) { throw new SerializationException( - "Error deserializing JSON message from topic: " + topic, e); - } - } - - private Object deserialize(final byte[] bytes) { - try { - if (bytes == null) { - return null; - } - - final JsonNode value = MAPPER.readTree(bytes); - return enforceFieldType(this, physicalSchema.serializedSchema(), value); - } catch (final IOException e) { - throw new SerializationException(e); + "mvn " + target + " from topic: " + topic, e); } } private static Object enforceFieldType( - final KsqlJsonDeserializer deserializer, - final Schema schema, - final JsonNode columnVal + final String pathPart, + final JsonValueContext context ) { - return enforceFieldType(new JsonValueContext(deserializer, schema, columnVal)); - } - - private static Object enforceFieldType(final JsonValueContext context) { if (context.val == null || context.val instanceof NullNode) { return null; } - final Function handler = HANDLERS.getOrDefault( - context.schema.type(), - type -> { - throw new KsqlException("Type is not supported: " + type); - }); - return handler.apply(context); + try { + final Function handler = HANDLERS.getOrDefault( + context.schema.type(), + type -> { + throw new KsqlException("Type is not supported: " + type); + }); + return handler.apply(context); + } catch (final CoercionException e) { + throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e); + } catch (final Exception e) { + throw new CoercionException(e.getMessage(), pathPart, e); + } } private static String processString(final JsonValueContext context) { @@ -142,10 +142,8 @@ private static String processString(final JsonValueContext context) { } } if (context.val instanceof ArrayNode) { - return Streams.stream(context.val.elements()) - .map(val -> processString( - new JsonValueContext(context.deserializer, context.schema, val) - )) + return enforceElementTypeForArray(new JsonValueContext(context.val, STRING_ARRAY)).stream() + .map(Objects::toString) .collect(Collectors.joining(", ", "[", "]")); } return context.val.asText(); @@ -171,10 +169,16 @@ private static List enforceElementTypeForArray(final JsonValueContext context throw invalidConversionException(context.val, context.schema); } + int idx = 0; final ArrayNode list = (ArrayNode) context.val; final List array = new ArrayList<>(list.size()); for (final JsonNode item : list) { - array.add(enforceFieldType(context.deserializer, context.schema.valueSchema(), item)); + final Object element = enforceFieldType( + "[" + idx++ + "]", + new JsonValueContext(item, context.schema.valueSchema()) + ); + + array.add(element); } return array; } @@ -188,15 +192,18 @@ private static Map enforceKeyAndValueTypeForMap(final JsonValueC final Map ksqlMap = new HashMap<>(map.size()); for (final Iterator> it = map.fields(); it.hasNext(); ) { final Entry e = it.next(); - ksqlMap.put( - enforceFieldType( - context.deserializer, - Schema.OPTIONAL_STRING_SCHEMA, - new TextNode(e.getKey())) - .toString(), - enforceFieldType( - context.deserializer, context.schema.valueSchema(), e.getValue()) + + final String key = (String) enforceFieldType( + "." + e.getKey() + ".key", + new JsonValueContext(new TextNode(e.getKey()), Schema.OPTIONAL_STRING_SCHEMA) ); + + final Object value = enforceFieldType( + "." + e.getKey() + ".value", + new JsonValueContext(e.getValue(), context.schema.valueSchema()) + ); + + ksqlMap.put(key, value); } return ksqlMap; } @@ -222,9 +229,8 @@ private static Struct enforceFieldTypesForStruct(final JsonValueContext context) } final Object coerced = enforceFieldType( - context.deserializer, - ksqlField.schema(), - fieldValue + "." + ksqlField.name(), + new JsonValueContext(fieldValue, ksqlField.schema()) ); columnStruct.put(ksqlField.name(), coerced); @@ -257,20 +263,37 @@ private static IllegalArgumentException invalidConversionException( ); } - private static class JsonValueContext { + private static final class JsonValueContext { - private final KsqlJsonDeserializer deserializer; private final Schema schema; private final JsonNode val; JsonValueContext( - final KsqlJsonDeserializer deserializer, - final Schema schema, - final JsonNode val + final JsonNode val, + final Schema schema ) { - this.deserializer = Objects.requireNonNull(deserializer); this.schema = Objects.requireNonNull(schema, "schema"); this.val = val; } } + + private static final class CoercionException extends RuntimeException { + + private final String path; + private final String message; + + CoercionException(final String message, final String path, final Throwable cause) { + super(message + ", path: " + path, cause); + this.message = Objects.requireNonNull(message, "message"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String getRawMessage() { + return message; + } + + public String getPath() { + return path; + } + } } \ No newline at end of file diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java index adf3ad624d56..3813fb76fd54 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/json/KsqlJsonDeserializerTest.java @@ -17,9 +17,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.fail; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; @@ -179,7 +181,7 @@ public void shouldThrowIfFieldCanNotBeCoerced() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: BIGINT"))); // When: @@ -226,13 +228,17 @@ public void shouldDeserializeNullAsNull() { @Test public void shouldTreatNullAsNull() { // Given: + final HashMap mapValue = new HashMap<>(); + mapValue.put("a", 1.0); + mapValue.put("b", null); + final Map row = new HashMap<>(); row.put("ordertime", null); row.put("@orderid", null); row.put("itemid", null); row.put("orderunits", null); row.put("arrayCol", new Double[]{0.0, null}); - row.put("mapCol", null); + row.put("mapCol", mapValue); final byte[] bytes = serializeJson(row); @@ -246,7 +252,7 @@ public void shouldTreatNullAsNull() { .put(ITEMID, null) .put(ORDERUNITS, null) .put(ARRAYCOL, Arrays.asList(0.0, null)) - .put(MAPCOL, null) + .put(MAPCOL, mapValue) .put(CASE_SENSITIVE_FIELD, null) )); } @@ -303,7 +309,7 @@ public void shouldThrowIfCanNotCoerceToBoolean() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: IntNode, requiredType: BOOLEAN"))); // When: @@ -342,7 +348,7 @@ public void shouldThrowIfCanNotCoerceToInt() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER"))); // When: @@ -382,7 +388,7 @@ public void shouldThrowIfCanNotCoerceToBigInt() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: BIGINT"))); // When: @@ -422,7 +428,7 @@ public void shouldThrowIfCanNotCoerceToDouble() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: DOUBLE"))); // When: @@ -487,7 +493,7 @@ public void shouldThrowIfCanNotCoerceToBigDecimal() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: DECIMAL(20, 19)"))); // When: @@ -523,7 +529,7 @@ public void shouldThrowIfNotAnArray() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: ARRAY"))); // When: @@ -544,7 +550,7 @@ public void shouldThrowIfCanNotCoerceArrayElement() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't coerce string to type. targetType: INTEGER"))); // When: @@ -580,7 +586,7 @@ public void shouldThrowIfNotAnMap() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: MAP"))); // When: @@ -599,7 +605,7 @@ public void shouldThrowIfCanNotCoerceMapValue() { // Then: expectedException.expect(SerializationException.class); - expectedException.expectCause(hasMessage(is( + expectedException.expectCause(hasMessage(startsWith( "Can't convert type. sourceType: BooleanNode, requiredType: INTEGER"))); // When: @@ -686,6 +692,74 @@ public void shouldNotIncludeBadValueInExceptionAsThatWouldBeASecurityIssue() { } } + @Test + public void shouldIncludePathForErrorsInRootNode() { + // Given: + givenDeserializerForSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); + + final byte[] bytes = "true".getBytes(StandardCharsets.UTF_8); + + // Then: + expectedException.expectCause(hasMessage(endsWith(", path: $"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInObjectFieldsValue() { + // Given: + final Map value = new HashMap<>(AN_ORDER); + value.put("ordertime", true); + + final byte[] bytes = serializeJson(value); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith(", path: $.ORDERTIME"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInArrayElements() { + // Given: + givenDeserializerForSchema(SchemaBuilder + .array(Schema.OPTIONAL_INT32_SCHEMA) + .build() + ); + + final List expected = ImmutableList.of(0, "not", "numbers"); + + final byte[] bytes = serializeJson(expected); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith("path: $[1]"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + + @Test + public void shouldIncludePathForErrorsInMapValues() { + // Given: + givenDeserializerForSchema(SchemaBuilder + .map(Schema.OPTIONAL_STRING_SCHEMA, Schema.INT32_SCHEMA) + .build() + ); + + final byte[] bytes = serializeJson(ImmutableMap.of("a", 1, "b", true)); + + // Then: + expectedException.expect(SerializationException.class); + expectedException.expectCause(hasMessage(endsWith("path: $.b.value"))); + + // When: + deserializer.deserialize(SOME_TOPIC, bytes); + } + private void givenDeserializerForSchema(final Schema serializedSchema) { final boolean unwrap = serializedSchema.type() != Type.STRUCT; final Schema ksqlSchema = unwrap