Skip to content

Commit

Permalink
KAFKA-13149; Fix NPE when handling malformed record data in produce r…
Browse files Browse the repository at this point in the history
…equests (apache#11080)

Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.

Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
ccding authored and Ralph Debusmann committed Dec 22, 2021
1 parent 3493550 commit 6c615e6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ public static DefaultRecord readFrom(ByteBuffer buffer,
Long logAppendTime) {
int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
if (buffer.remaining() < sizeOfBodyInBytes)
return null;
throw new InvalidRecordException("Invalid record size: expected " + sizeOfBodyInBytes +
" bytes in record payload, but instead the buffer has only " + buffer.remaining() +
" remaining bytes.");

int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,4 +481,14 @@ public void testSerdeNoSequence() throws IOException {
assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
}

@Test
public void testInvalidSizeOfBodyInBytes() {
int sizeOfBodyInBytes = 10;
ByteBuffer buf = ByteBuffer.allocate(5);
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);

buf.flip();
assertThrows(InvalidRecordException.class,
() -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
}
}

0 comments on commit 6c615e6

Please sign in to comment.