Skip to content

Commit

Permalink
fix: fix NPE when printing records with empty value (MINOR) (#3470)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Oct 3, 2019
1 parent 81557e3 commit 47313ff
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public List<String> format(final ConsumerRecords<String, Bytes> records) {
.stream(records.records(topicName).spliterator(), false)
.filter(Objects::nonNull)
.filter(r -> r.value() != null)
.filter(r -> r.value().get() != null)
.filter(r -> r.value().get().length != 0)
.map((record) -> {
if (formatter == null) {
formatter = getFormatter(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@

public class TopicStreamTest {

private static final String TOPIC_NAME = "some-topic";

private SchemaRegistryClient schemaRegistryClient;
private RecordFormatter formatter;

@Before
public void setUp() {
schemaRegistryClient = mock(SchemaRegistryClient.class);
formatter = new RecordFormatter(schemaRegistryClient, TOPIC_NAME);
}

@Test
Expand All @@ -83,7 +87,7 @@ public void shouldMatchAvroFormatter() throws Exception {
final byte[] avroData = serializeAvroRecord(avroRecord);

// When:
final Result result = getFormatter(avroData);
final Result result = getFormattedResult(avroData);

// Then:
assertThat(result.format, is(Format.AVRO));
Expand All @@ -97,7 +101,7 @@ public void shouldNotMatchAvroFormatter() {
final String notAvro = "test-data";

// When:
final Result result = getFormatter(notAvro);
final Result result = getFormattedResult(notAvro);

// Then:
assertThat(result.format, is(not(Format.AVRO)));
Expand All @@ -114,7 +118,7 @@ public void shouldFormatJson() {
"}";

// When:
final Result result = getFormatter(json);
final Result result = getFormattedResult(json);

// Then:
assertThat(result.format, is(Format.JSON));
Expand All @@ -133,7 +137,7 @@ public void shouldNotMatchJsonFormatter() {
"}";

// When:
final Result result = getFormatter(notJson);
final Result result = getFormattedResult(notJson);

// Then:
assertThat(result.format, is(not(Format.JSON)));
Expand All @@ -147,25 +151,46 @@ public void shouldMatchStringFormatWithOneColumnValues() {
final String stringValue = "v1";

// When:
final Result result = getFormatter(stringValue);
final Result result = getFormattedResult(stringValue);

// Then:
assertThat(result.format, is(Format.STRING));
}

@Test
public void shouldFilterNullValues() {
// Given:
replay(schemaRegistryClient);

final ConsumerRecord<String, Bytes> record = new ConsumerRecord<>(
"some-topic", 1, 1, "key", null);
final RecordFormatter formatter =
new RecordFormatter(schemaRegistryClient, "some-topic");
final ConsumerRecords<String, Bytes> records = new ConsumerRecords<>(
ImmutableMap.of(new TopicPartition("some-topic", 1),
ImmutableList.of(record)));
// When:
final List<String> formatted = getFormattedRecord(null);

// Then:
assertThat(formatted, empty());
}

assertThat(formatter.format(records), empty());
@Test
public void shouldFilterNullBytesValues() {
// Given:
replay(schemaRegistryClient);

// When:
final List<String> formatted = getFormattedRecord(new Bytes(null));

// Then:
assertThat(formatted, empty());
}

@Test
public void shouldFilterEmptyValues() {
// Given:
replay(schemaRegistryClient);

// When:
final List<String> formatted = getFormattedRecord(new Bytes(Bytes.EMPTY));

// Then:
assertThat(formatted, empty());
}

@Test
Expand All @@ -174,34 +199,35 @@ public void shouldHandleNullValuesFromSTRINGPrint() throws IOException {
SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault());

final ConsumerRecord<String, Bytes> record = new ConsumerRecord<>(
"some-topic", 1, 1, "key", null);
TOPIC_NAME, 1, 1, "key", null);

final String formatted =
Format.STRING.maybeGetFormatter(
"some-topic", record, null, dateFormat).get().print(record);
TOPIC_NAME, record, null, dateFormat).get().print(record);

assertThat(formatted, endsWith(", key , NULL\n"));
}

private Result getFormatter(final String data) {
return getFormatter(data.getBytes(StandardCharsets.UTF_8));
private Result getFormattedResult(final String data) {
return getFormattedResult(data.getBytes(StandardCharsets.UTF_8));
}

private Result getFormatter(final byte[] data) {
final ConsumerRecord<String, Bytes> record = new ConsumerRecord<>(
"some-topic", 1, 1, "key", new Bytes(data));
private Result getFormattedResult(final byte[] data) {
final List<String> formatted = getFormattedRecord(new Bytes(data));
assertThat("Only expect one line", formatted, hasSize(1));

final RecordFormatter formatter =
new RecordFormatter(schemaRegistryClient, "some-topic");
return new Result(formatter.getFormat(), formatted.get(0));
}

final ConsumerRecords<String, Bytes> records = new ConsumerRecords<>(
ImmutableMap.of(new TopicPartition("some-topic", 1),
ImmutableList.of(record)));
private List<String> getFormattedRecord(final Bytes data) {
final ConsumerRecord<String, Bytes> record = new ConsumerRecord<>(
TOPIC_NAME, 1, 1, "key", data);

final List<String> formatted = formatter.format(records);
assertThat("Only expect one line", formatted, hasSize(1));
final ConsumerRecords<String, Bytes> records = new ConsumerRecords<>(
ImmutableMap.of(new TopicPartition(TOPIC_NAME, 1),
ImmutableList.of(record)));

return new Result(formatter.getFormat(), formatted.get(0));
return formatter.format(records);
}

@SuppressWarnings("SameParameterValue")
Expand Down

0 comments on commit 47313ff

Please sign in to comment.