From 6c615e61e3a144e2923bd560bf4435b4739311bc Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 14 Sep 2021 11:47:54 -0500 Subject: [PATCH] KAFKA-13149; Fix NPE when handling malformed record data in produce requests (#11080) Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches. Reviewers: Ismael Juma , Jason Gustafson --- .../org/apache/kafka/common/record/DefaultRecord.java | 4 +++- .../apache/kafka/common/record/DefaultRecordTest.java | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index b63773bf95931..8772556b1dec1 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer, Long logAppendTime) { int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); if (buffer.remaining() < sizeOfBodyInBytes) - return null; + throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes + + " bytes in record payload, but instead the buffer has only " + buffer.remaining() + + " remaining bytes."); int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index af154d321d249..49743d2320135 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -481,4 +481,14 @@ public void testSerdeNoSequence() throws IOException { assertEquals(RecordBatch.NO_SEQUENCE, record.sequence()); } + @Test + public void testInvalidSizeOfBodyInBytes() { + int sizeOfBodyInBytes = 10; + ByteBuffer buf = ByteBuffer.allocate(5); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + + buf.flip(); + assertThrows(InvalidRecordException.class, + () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null)); + } }