Skip to content

Commit

Permalink
Merge pull request #672 from YotpoLtd/decode-null-messages-using-avro…
Browse files Browse the repository at this point in the history
…-deserializer

Send null messages to kafka avro deserializer
  • Loading branch information
HenryCaiHaiying authored Apr 14, 2019
2 parents 7d78f3b + e11ccb8 commit 985eccc
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ protected void init(SecorConfig config) {
}

public GenericRecord decodeMessage(String topic, byte[] message) {
if (message.length == 0) {
message = null;
}
GenericRecord record = (GenericRecord) deserializer.deserialize(topic, message);
Schema schema = record.getSchema();
schemas.put(topic, schema);
if (record != null) {
Schema schema = record.getSchema();
schemas.put(topic, schema);
}
return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,8 @@ public void testDecodeMessage() throws Exception {
assertEquals(output.get("data_field_1"), 1);
assertTrue(StringUtils.equals((output.get("data_field_2")).toString(), "hello"));
assertEquals(output.get("timestamp"), 1467176316L);

output = secorSchemaRegistryClient.decodeMessage("test-avr-topic", new byte[0]);
assertNull(output);
}
}

0 comments on commit 985eccc

Please sign in to comment.