Skip to content

Commit

Permalink
feat: add the topic name to deserialization errors (#4573)
Browse files Browse the repository at this point in the history
* feat: add the topic name to deserialization errors
  • Loading branch information
rodesai authored Feb 20, 2020
1 parent b503d25 commit 0f7edf6
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public final class ProcessingLogMessageSchema {
public static final String DESERIALIZATION_ERROR_FIELD_MESSAGE = "errorMessage";
public static final String DESERIALIZATION_ERROR_FIELD_RECORD_B64 = "recordB64";
public static final String DESERIALIZATION_ERROR_FIELD_CAUSE = "cause";
public static final String DESERIALIZATION_ERROR_FIELD_TOPIC = "topic";

private static final Schema DESERIALIZATION_ERROR_SCHEMA = SchemaBuilder.struct()
.name(NAMESPACE + "DeserializationError")
.field(DESERIALIZATION_ERROR_FIELD_MESSAGE, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_RECORD_B64, Schema.OPTIONAL_STRING_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_CAUSE, CAUSE_SCHEMA)
.field(DESERIALIZATION_ERROR_FIELD_TOPIC, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void shouldBuildCorrectStreamCreateDDL() {
+ "time BIGINT, "
+ "message STRUCT<"
+ "type INT, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, "
+ "deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, "
+ "recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, "
+ "productionError STRUCT<errorMessage VARCHAR>"
+ ">"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public T deserialize(final String topic, final byte[] bytes) {
return delegate.deserialize(topic, bytes);
} catch (final RuntimeException e) {
processingLogger.error(
SerdeProcessingLogMessageFactory.deserializationErrorMsg(e, Optional.ofNullable(bytes))
SerdeProcessingLogMessageFactory.deserializationErrorMsg(
e, Optional.ofNullable(bytes), topic)
);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private SerdeProcessingLogMessageFactory() {

public static Function<ProcessingLogConfig, SchemaAndValue> deserializationErrorMsg(
final Throwable exception,
final Optional<byte[]> record
final Optional<byte[]> record,
final String topic
) {
Objects.requireNonNull(exception);
return (config) -> {
Expand All @@ -48,6 +49,10 @@ public static Function<ProcessingLogConfig, SchemaAndValue> deserializationError
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE,
cause
);
deserializationError.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_TOPIC,
topic
);
if (config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
deserializationError.put(
ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void shouldLogOnException() {
final SchemaAndValue result = errorCaptor.getValue().apply(LOG_CONFIG);

assertThat(result, is(SerdeProcessingLogMessageFactory
.deserializationErrorMsg(e, Optional.of(SOME_BYTES)).apply(LOG_CONFIG)));
.deserializationErrorMsg(e, Optional.of(SOME_BYTES), "t").apply(LOG_CONFIG)));

throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@

package io.confluent.ksql.serde.util;

import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_CAUSE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_MESSAGE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.DESERIALIZATION_ERROR_FIELD_RECORD_B64;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.TYPE;
import static io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -54,7 +49,8 @@ public void shouldSetNullRecordToNull() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.empty()
Optional.empty(),
"topic"
).apply(config);

// Then:
Expand All @@ -68,7 +64,8 @@ public void shouldBuildDeserializationError() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.of(record)
Optional.of(record),
"topic"
).apply(config);

// Then:
Expand All @@ -91,6 +88,10 @@ public void shouldBuildDeserializationError() {
deserializationError.get(DESERIALIZATION_ERROR_FIELD_RECORD_B64),
equalTo(Base64.getEncoder().encodeToString(record))
);
assertThat(
deserializationError.get(DESERIALIZATION_ERROR_FIELD_TOPIC),
equalTo("topic")
);
schema.fields().forEach(
f -> {
if (!ImmutableList.of(TYPE, DESERIALIZATION_ERROR).contains(f.name())) {
Expand All @@ -110,7 +111,8 @@ public void shouldBuildDeserializationErrorWithNullRecordIfIncludeRowFalse() {
// When:
final SchemaAndValue msg = SerdeProcessingLogMessageFactory.deserializationErrorMsg(
error,
Optional.of(record)
Optional.of(record),
"topic"
).apply(config);

// Then:
Expand Down

0 comments on commit 0f7edf6

Please sign in to comment.