Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalcph committed May 20, 2024
1 parent 6452a45 commit 51b6bb8
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions core/src/test/scala/unit/kafka/log/LocalLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,43 @@ class LocalLogTest {
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
}

@Test
def testWhenFetchOffsetHigherThanMaxOffset(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
for (offset <- 0 to 4) {
appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0)
log.roll()
}
assertEquals(3, log.segments.numberOfSegments)

// case-0: valid case, `startOffset` < `maxOffsetMetadata.offset`
var fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertEquals(1, fetchDataInfo.records.records.asScala.size)
assertEquals(new LogOffsetMetadata(3, 2L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-1: `startOffset` == `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 4L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(4L, 4L, 0), fetchDataInfo.fetchOffsetMetadata)

// case-2: `startOffset` > `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 5L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(5L, -1L, -1), fetchDataInfo.fetchOffsetMetadata)

// case-3: `startOffset` < `maxMessageOffset.offset` but `maxMessageOffset.messageOnlyOffset` is true
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, -1L, -1))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, -1L, -1), fetchDataInfo.fetchOffsetMetadata)

// case-4: `startOffset` < `maxMessageOffset.offset`, `maxMessageOffset.messageOnlyOffset` is false, but
// `maxOffsetMetadata.segmentBaseOffset` < `startOffset.segmentBaseOffset`
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 0L, 40))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, -1L, -1), fetchDataInfo.fetchOffsetMetadata)
}

@Test
def testTruncateTo(): Unit = {
for (offset <- 0 to 11) {
Expand Down

0 comments on commit 51b6bb8

Please sign in to comment.