Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include path of field that causes JSON deserialization error #4249

Merged
merged 1 commit into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.CommandLineOptions;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

Expand All @@ -68,8 +68,8 @@
*/
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 3, time = 10)
@Warmup(iterations = 6, time = 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also increased the times on the SerdeBenchmark, as from my own experience 10 seconds is not enough time for the JVM to optimise the byte code.

Pardon the naivete, but isn't the point of using JMH to avoid optimizations? Why are we trying to allow the JVM time to perform optimizations? Wouldn't that be less representative of how these segments of code are used in practice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all. We want to JVM to be given time to optimise the code. That's what the warm-up is all about. When KSQL is running in production it will be running for a long time and the JVM will optimise such code paths. So a meaningful benchmark needs to give the JVM time to do this before running tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps my understanding gap is in JVM optimizations, then. It's true that when KSQL is running in production the JVM will have time to optimize this code, but KSQL running in production wouldn't just be running this serde code over and over again. Intuitively it feels like the JVM would be able to make stronger optimizations on the serde code if that's all that's being run, compared to the the serde code being run as part of other KSQL code. Is this not true / not a concern for benchmarking?

@Measurement(iterations = 3, time = 60)
@Threads(4)
@Fork(3)
public class SerdeBenchmark {
Expand Down Expand Up @@ -192,20 +192,26 @@ private static Serde<GenericRow> getGenericRowSerde(
}
}

@SuppressWarnings("MethodMayBeStatic") // Tests can not be static
@Benchmark
public byte[] serialize(final SerdeState serdeState) {
return serdeState.serializer.serialize(TOPIC_NAME, serdeState.row);
}

@SuppressWarnings("MethodMayBeStatic") // Tests can not be static
@Benchmark
public GenericRow deserialize(final SerdeState serdeState) {
return serdeState.deserializer.deserialize(TOPIC_NAME, serdeState.bytes);
}

public static void main(final String[] args) throws RunnerException {
final Options opt = new OptionsBuilder()
.include(SerdeBenchmark.class.getSimpleName())
.build();
public static void main(final String[] args) throws Exception {

final Options opt = args.length != 0
? new CommandLineOptions(args)
: new OptionsBuilder()
.include(SerdeBenchmark.class.getSimpleName())
.shouldFailOnError(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this argument that controls whether to fail on error? Is this documented somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where it's documented. Just means if the code under test throws exceptions it fails, which is what we want.

.build();

new Runner(opt).run();
}
Expand Down
52 changes: 52 additions & 0 deletions ksql-benchmark/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Copyright 2019 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

# Disable INFO logs from Config classes, which log out their config on each creation:
log4j.logger.io.confluent.connect.avro.AvroConverterConfig=WARN
log4j.logger.io.confluent.connect.avro.AvroDataConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaAvroDeserializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaAvroSerializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaJsonDeserializerConfig=WARN
log4j.logger.io.confluent.kafka.serializers.KafkaJsonSerializerConfig=WARN
log4j.logger.io.confluent.ksql.logging.processing.ProcessingLogConfig=WARN
log4j.logger.io.confluent.ksql.rest.server.KsqlRestConfig=WARN
log4j.logger.io.confluent.ksql.util.KsqlConfig=WARN
log4j.logger.io.confluent.ksql.cli.console.CliConfig=WARN
log4j.logger.kafka.server.KafkaConfig=WARN
log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=WARN
log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=WARN
log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=WARN
log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN
log4j.logger.org.apache.kafka.streams.StreamsConfig=WARN

# Disable INFO logging from the UDF loader, which logs every UDF ever time it runs:
log4j.logger.io.confluent.ksql.function.UdfLoader=WARN

# Disable logging of state transitions in KS:
log4j.logger.org.apache.kafka.streams.KafkaStreams=WARN
log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN
log4j.logger.org.apache.kafka.streams.state.internals.RocksDBTimestampedStore=WARN

# Disable logging of App info
log4j.logger.org.apache.kafka.common.utils.AppInfoParser=WARN

# Disable logging from reflections warning for connect classpath scans
log4j.logger.org.reflections=ERROR
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -46,12 +44,12 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
@SuppressWarnings("UnstableApiUsage")
public class KsqlJsonDeserializer implements Deserializer<Object> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

Expand All @@ -60,6 +58,9 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
private static final ObjectMapper MAPPER = new ObjectMapper()
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);

private static final Schema STRING_ARRAY = SchemaBuilder
.array(Schema.OPTIONAL_STRING_SCHEMA).build();

private static final Map<Schema.Type, Function<JsonValueContext, Object>> HANDLERS = ImmutableMap
.<Schema.Type, Function<JsonValueContext, Object>>builder()
.put(Type.BOOLEAN, context -> JsonSerdeUtils.toBoolean(context.val))
Expand All @@ -74,6 +75,7 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
.build();

private final PersistenceSchema physicalSchema;
private String target = "?";

public KsqlJsonDeserializer(
final PersistenceSchema physicalSchema
Expand All @@ -82,55 +84,53 @@ public KsqlJsonDeserializer(
}

@Override
public void configure(final Map<String, ?> map, final boolean b) {
public void configure(final Map<String, ?> map, final boolean isKey) {
this.target = isKey ? "key" : "value";
}

@Override
public Object deserialize(final String topic, final byte[] bytes) {
try {
final Object value = deserialize(bytes);
final JsonNode value = bytes == null
? null
: MAPPER.readTree(bytes);

final Object coerced = enforceFieldType(
"$",
new JsonValueContext(value, physicalSchema.serializedSchema())
);

if (LOG.isTraceEnabled()) {
LOG.trace("Deserialized value. topic:{}, row:{}", topic, value);
LOG.trace("Deserialized {}. topic:{}, row:{}", target, topic, coerced);
}
return value;

return coerced;
} catch (final Exception e) {
throw new SerializationException(
"Error deserializing JSON message from topic: " + topic, e);
}
}

private Object deserialize(final byte[] bytes) {
try {
if (bytes == null) {
return null;
}

final JsonNode value = MAPPER.readTree(bytes);
return enforceFieldType(this, physicalSchema.serializedSchema(), value);
} catch (final IOException e) {
throw new SerializationException(e);
"mvn " + target + " from topic: " + topic, e);
}
}

private static Object enforceFieldType(
final KsqlJsonDeserializer deserializer,
final Schema schema,
final JsonNode columnVal
final String pathPart,
final JsonValueContext context
) {
return enforceFieldType(new JsonValueContext(deserializer, schema, columnVal));
}

private static Object enforceFieldType(final JsonValueContext context) {
if (context.val == null || context.val instanceof NullNode) {
return null;
}

final Function<JsonValueContext, Object> handler = HANDLERS.getOrDefault(
context.schema.type(),
type -> {
throw new KsqlException("Type is not supported: " + type);
});
return handler.apply(context);
try {
final Function<JsonValueContext, Object> handler = HANDLERS.getOrDefault(
context.schema.type(),
type -> {
throw new KsqlException("Type is not supported: " + type);
});
return handler.apply(context);
} catch (final CoercionException e) {
throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! I spent some time thinking about how to implement this and didn't come up with anything nearly as nice. Glad to have had the opportunity to learn from you :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

} catch (final Exception e) {
throw new CoercionException(e.getMessage(), pathPart, e);
}
}

private static String processString(final JsonValueContext context) {
Expand All @@ -142,10 +142,8 @@ private static String processString(final JsonValueContext context) {
}
}
if (context.val instanceof ArrayNode) {
return Streams.stream(context.val.elements())
.map(val -> processString(
new JsonValueContext(context.deserializer, context.schema, val)
))
return enforceElementTypeForArray(new JsonValueContext(context.val, STRING_ARRAY)).stream()
.map(Objects::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
return context.val.asText();
Expand All @@ -171,10 +169,16 @@ private static List<?> enforceElementTypeForArray(final JsonValueContext context
throw invalidConversionException(context.val, context.schema);
}

int idx = 0;
final ArrayNode list = (ArrayNode) context.val;
final List<Object> array = new ArrayList<>(list.size());
for (final JsonNode item : list) {
array.add(enforceFieldType(context.deserializer, context.schema.valueSchema(), item));
final Object element = enforceFieldType(
"[" + idx++ + "]",
new JsonValueContext(item, context.schema.valueSchema())
);

array.add(element);
}
return array;
}
Expand All @@ -188,15 +192,18 @@ private static Map<String, Object> enforceKeyAndValueTypeForMap(final JsonValueC
final Map<String, Object> ksqlMap = new HashMap<>(map.size());
for (final Iterator<Entry<String, JsonNode>> it = map.fields(); it.hasNext(); ) {
final Entry<String, JsonNode> e = it.next();
ksqlMap.put(
enforceFieldType(
context.deserializer,
Schema.OPTIONAL_STRING_SCHEMA,
new TextNode(e.getKey()))
.toString(),
enforceFieldType(
context.deserializer, context.schema.valueSchema(), e.getValue())

final String key = (String) enforceFieldType(
"." + e.getKey() + ".key",
new JsonValueContext(new TextNode(e.getKey()), Schema.OPTIONAL_STRING_SCHEMA)
);

final Object value = enforceFieldType(
"." + e.getKey() + ".value",
new JsonValueContext(e.getValue(), context.schema.valueSchema())
);

ksqlMap.put(key, value);
}
return ksqlMap;
}
Expand All @@ -222,9 +229,8 @@ private static Struct enforceFieldTypesForStruct(final JsonValueContext context)
}

final Object coerced = enforceFieldType(
context.deserializer,
ksqlField.schema(),
fieldValue
"." + ksqlField.name(),
new JsonValueContext(fieldValue, ksqlField.schema())
);

columnStruct.put(ksqlField.name(), coerced);
Expand Down Expand Up @@ -257,20 +263,37 @@ private static IllegalArgumentException invalidConversionException(
);
}

private static class JsonValueContext {
private static final class JsonValueContext {

private final KsqlJsonDeserializer deserializer;
private final Schema schema;
private final JsonNode val;

JsonValueContext(
final KsqlJsonDeserializer deserializer,
final Schema schema,
final JsonNode val
final JsonNode val,
final Schema schema
) {
this.deserializer = Objects.requireNonNull(deserializer);
this.schema = Objects.requireNonNull(schema, "schema");
this.val = val;
}
}

private static final class CoercionException extends RuntimeException {

private final String path;
private final String message;

CoercionException(final String message, final String path, final Throwable cause) {
super(message + ", path: " + path, cause);
this.message = Objects.requireNonNull(message, "message");
this.path = Objects.requireNonNull(path, "path");
}

public String getRawMessage() {
return message;
}

public String getPath() {
return path;
}
}
}
Loading