From 47313ff454ea78dbc8182a96b0e9fbfb0920e1f6 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 3 Oct 2019 15:50:46 -0700 Subject: [PATCH] fix: fix NPE when printing records with empty value (MINOR) (#3470) --- .../resources/streaming/TopicStream.java | 2 + .../resources/streaming/TopicStreamTest.java | 82 ++++++++++++------- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java index c24b1e132b28..464f933105b2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java @@ -68,6 +68,8 @@ public List format(final ConsumerRecords records) { .stream(records.records(topicName).spliterator(), false) .filter(Objects::nonNull) .filter(r -> r.value() != null) + .filter(r -> r.value().get() != null) + .filter(r -> r.value().get().length != 0) .map((record) -> { if (formatter == null) { formatter = getFormatter(record); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java index e23cbf5def91..7f9e76491446 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java @@ -53,11 +53,15 @@ public class TopicStreamTest { + private static final String TOPIC_NAME = "some-topic"; + private SchemaRegistryClient schemaRegistryClient; + private RecordFormatter formatter; @Before public void setUp() { schemaRegistryClient = mock(SchemaRegistryClient.class); + formatter = new RecordFormatter(schemaRegistryClient, TOPIC_NAME); } @Test @@ -83,7 +87,7 @@ public void shouldMatchAvroFormatter() throws Exception { final byte[] avroData = serializeAvroRecord(avroRecord); // When: - final Result result = getFormatter(avroData); + final Result result = getFormattedResult(avroData); // Then: assertThat(result.format, is(Format.AVRO)); @@ -97,7 +101,7 @@ public void shouldNotMatchAvroFormatter() { final String notAvro = "test-data"; // When: - final Result result = getFormatter(notAvro); + final Result result = getFormattedResult(notAvro); // Then: assertThat(result.format, is(not(Format.AVRO))); @@ -114,7 +118,7 @@ public void shouldFormatJson() { "}"; // When: - final Result result = getFormatter(json); + final Result result = getFormattedResult(json); // Then: assertThat(result.format, is(Format.JSON)); @@ -133,7 +137,7 @@ public void shouldNotMatchJsonFormatter() { "}"; // When: - final Result result = getFormatter(notJson); + final Result result = getFormattedResult(notJson); // Then: assertThat(result.format, is(not(Format.JSON))); @@ -147,7 +151,7 @@ public void shouldMatchStringFormatWithOneColumnValues() { final String stringValue = "v1"; // When: - final Result result = getFormatter(stringValue); + final Result result = getFormattedResult(stringValue); // Then: assertThat(result.format, is(Format.STRING)); @@ -155,17 +159,38 @@ public void shouldMatchStringFormatWithOneColumnValues() { @Test public void shouldFilterNullValues() { + // Given: replay(schemaRegistryClient); - final ConsumerRecord record = new ConsumerRecord<>( - "some-topic", 1, 1, "key", null); - final RecordFormatter formatter = - new RecordFormatter(schemaRegistryClient, "some-topic"); - final ConsumerRecords records = new ConsumerRecords<>( - ImmutableMap.of(new TopicPartition("some-topic", 1), - ImmutableList.of(record))); + // When: + final List formatted = getFormattedRecord(null); + + // Then: + assertThat(formatted, empty()); + } - assertThat(formatter.format(records), empty()); + @Test + public void shouldFilterNullBytesValues() { + // Given: + replay(schemaRegistryClient); + + // When: + final List formatted = getFormattedRecord(new Bytes(null)); + + // Then: + assertThat(formatted, empty()); + } + + @Test + public void shouldFilterEmptyValues() { + // Given: + replay(schemaRegistryClient); + + // When: + final List formatted = getFormattedRecord(new Bytes(Bytes.EMPTY)); + + // Then: + assertThat(formatted, empty()); } @Test @@ -174,34 +199,35 @@ public void shouldHandleNullValuesFromSTRINGPrint() throws IOException { SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()); final ConsumerRecord record = new ConsumerRecord<>( - "some-topic", 1, 1, "key", null); + TOPIC_NAME, 1, 1, "key", null); final String formatted = Format.STRING.maybeGetFormatter( - "some-topic", record, null, dateFormat).get().print(record); + TOPIC_NAME, record, null, dateFormat).get().print(record); assertThat(formatted, endsWith(", key , NULL\n")); } - private Result getFormatter(final String data) { - return getFormatter(data.getBytes(StandardCharsets.UTF_8)); + private Result getFormattedResult(final String data) { + return getFormattedResult(data.getBytes(StandardCharsets.UTF_8)); } - private Result getFormatter(final byte[] data) { - final ConsumerRecord record = new ConsumerRecord<>( - "some-topic", 1, 1, "key", new Bytes(data)); + private Result getFormattedResult(final byte[] data) { + final List formatted = getFormattedRecord(new Bytes(data)); + assertThat("Only expect one line", formatted, hasSize(1)); - final RecordFormatter formatter = - new RecordFormatter(schemaRegistryClient, "some-topic"); + return new Result(formatter.getFormat(), formatted.get(0)); + } - final ConsumerRecords records = new ConsumerRecords<>( - ImmutableMap.of(new TopicPartition("some-topic", 1), - ImmutableList.of(record))); + private List getFormattedRecord(final Bytes data) { + final ConsumerRecord record = new ConsumerRecord<>( + TOPIC_NAME, 1, 1, "key", data); - final List formatted = formatter.format(records); - assertThat("Only expect one line", formatted, hasSize(1)); + final ConsumerRecords records = new ConsumerRecords<>( + ImmutableMap.of(new TopicPartition(TOPIC_NAME, 1), + ImmutableList.of(record))); - return new Result(formatter.getFormat(), formatted.get(0)); + return formatter.format(records); } @SuppressWarnings("SameParameterValue")