Skip to content

Commit

Permalink
KAFKA-16452: Don't throw OOORE when converting the offset to metadata (
Browse files Browse the repository at this point in the history
…apache#15825)

Don't throw OFFSET_OUT_OF_RANGE error when converting the offset to metadata, and next time the leader should increment the high watermark by itself after receiving fetch requests from followers. This can happen when checkpoint files are missing and being elected as a leader. 

Reviewers: Luke Chen <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
kamalcph authored and wernerdv committed Jun 3, 2024
1 parent a6196a3 commit b92dd52
Show file tree
Hide file tree
Showing 18 changed files with 448 additions and 51 deletions.
19 changes: 13 additions & 6 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,19 @@ class LocalLog(@volatile private var _dir: File,
val segment = segmentOpt.get
val baseOffset = segment.baseOffset

val maxPosition =
// Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment
else segment.size

fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
// 1. If `maxOffsetMetadata#segmentBaseOffset < segment#baseOffset`, then return maxPosition as empty.
// 2. Use the max-offset position if it is on this segment; otherwise, the segment size is the limit.
// 3. When maxOffsetMetadata is message-offset-only, then we don't know the relativePositionInSegment so
// return maxPosition as empty to avoid reading beyond the max-offset
val maxPositionOpt: Optional[java.lang.Long] =
if (segment.baseOffset < maxOffsetMetadata.segmentBaseOffset)
Optional.of(segment.size)
else if (segment.baseOffset == maxOffsetMetadata.segmentBaseOffset && !maxOffsetMetadata.messageOffsetOnly())
Optional.of(maxOffsetMetadata.relativePositionInSegment)
else
Optional.empty()

fetchDataInfo = segment.read(startOffset, maxLength, maxPositionOpt, minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo)
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val offsetMetadata = highWatermarkMetadata
if (offsetMetadata.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
val fullOffset = maybeConvertToOffsetMetadata(highWatermark)
updateHighWatermarkMetadata(fullOffset)
fullOffset
}
Expand Down Expand Up @@ -405,7 +405,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
val fullOffset = maybeConvertToOffsetMetadata(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
Expand Down Expand Up @@ -964,7 +964,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.asScala match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
Some(convertToOffsetMetadataOrThrow(offset))
Some(maybeConvertToOffsetMetadata(offset))
case other => other
}

Expand Down Expand Up @@ -1425,11 +1425,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,

/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
* 1. If the message offset is less than the log-start-offset (or) local-log-start-offset, then it returns the
* message-only metadata.
* 2. If the message offset is beyond the log-end-offset, then it returns the message-only metadata.
* 3. For all other cases, it returns the offset metadata from the log.
*/
private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
checkLogStartOffset(offset)
localLog.convertToOffsetMetadataOrThrow(offset)
private[log] def maybeConvertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
try {
localLog.convertToOffsetMetadataOrThrow(offset)
} catch {
case _: OffsetOutOfRangeException =>
new LogOffsetMetadata(offset)
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ final class KafkaMetadataLog private (

override def highWatermark: LogOffsetMetadata = {
val hwm = log.fetchOffsetSnapshot.highWatermark
val segmentPosition: Optional[OffsetMetadata] = if (hwm.messageOffsetOnly) {
val segmentPosition: Optional[OffsetMetadata] = if (!hwm.messageOffsetOnly) {
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
} else {
Optional.empty()
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ class DelayedFetch(
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
if (fetchOffset.messageOffset > endOffset.messageOffset) {
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.")
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
if (fetchOffset.onOlderSegment(endOffset)) {
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $this immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
} else if (fetchOffset.onSameSegment(endOffset)) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, when}

Expand All @@ -46,7 +48,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand Down Expand Up @@ -92,7 +94,7 @@ class DelayedFetchTest {

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
Expand All @@ -116,6 +118,9 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.error)
}

@Test
Expand Down Expand Up @@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)

val fetchResult = fetchResultOpt.get
assertEquals(Errors.NONE, fetchResult.error)
}

@ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}")
@ValueSource(longs = Array(0, 500))
def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val fetchOffset = 450L
val logStartOffset = 5L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
params = fetchParams,
fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
)

val partition: Partition = mock(classOf[Partition])
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
// Note that the high-watermark does not contain the complete metadata
val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
when(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE)

// 1. When `endOffset` is 0, it refers to the truncation case
// 2. When `endOffset` is 500, we won't complete because it doesn't contain offset metadata
val expected = endOffset == 0
assertEquals(expected, delayedFetch.tryComplete())
assertEquals(expected, delayedFetch.isCompleted)
assertEquals(expected, fetchResultOpt.isDefined)
if (fetchResultOpt.isDefined) {
assertEquals(Errors.NONE, fetchResultOpt.get.error)
}
}

private def buildFollowerFetchParams(
replicaId: Int,
maxWaitMs: Int
maxWaitMs: Int,
minBytes: Int = 1,
): FetchParams = {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
1,
maxWaitMs,
1,
minBytes,
maxBytes,
FetchIsolation.LOG_END,
Optional.empty()
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ final class KafkaMetadataLogTest {
)
}

@Test
def testHighWatermarkOffsetMetadata(): Unit = {
val numberOfRecords = 10
val epoch = 1
val log = buildMetadataLog(tempDir, mockTime)

append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))

val highWatermarkMetadata = log.highWatermark
assertEquals(numberOfRecords, highWatermarkMetadata.offset)
assertTrue(highWatermarkMetadata.metadata.isPresent)

val segmentPosition = highWatermarkMetadata.metadata.get().asInstanceOf[SegmentPosition]
assertEquals(0, segmentPosition.baseOffset)
assertTrue(segmentPosition.relativePosition > 0)
}

@Test
def testCreateSnapshotBeforeLogStartOffset(): Unit = {
val numberOfRecords = 10
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/scala/unit/kafka/log/LocalLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,43 @@ class LocalLogTest {
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
}

@Test
def testWhenFetchOffsetHigherThanMaxOffset(): Unit = {
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
for (offset <- 0 to 4) {
appendRecords(List(record), initialOffset = offset)
if (offset % 2 != 0)
log.roll()
}
assertEquals(3, log.segments.numberOfSegments)

// case-0: valid case, `startOffset` < `maxOffsetMetadata.offset`
var fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertEquals(1, fetchDataInfo.records.records.asScala.size)
assertEquals(new LogOffsetMetadata(3, 2L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-1: `startOffset` == `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 4L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(4L, 4L, 0), fetchDataInfo.fetchOffsetMetadata)

// case-2: `startOffset` > `maxOffsetMetadata.offset`
fetchDataInfo = readRecords(startOffset = 5L, maxOffsetMetadata = new LogOffsetMetadata(4L, 4L, 0))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(5L, 4L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-3: `startOffset` < `maxMessageOffset.offset` but `maxMessageOffset.messageOnlyOffset` is true
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, -1L, -1))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, 2L, 69), fetchDataInfo.fetchOffsetMetadata)

// case-4: `startOffset` < `maxMessageOffset.offset`, `maxMessageOffset.messageOnlyOffset` is false, but
// `maxOffsetMetadata.segmentBaseOffset` < `startOffset.segmentBaseOffset`
fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new LogOffsetMetadata(4L, 0L, 40))
assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
assertEquals(new LogOffsetMetadata(3L, 2L, 69), fetchDataInfo.fetchOffsetMetadata)
}

@Test
def testTruncateTo(): Unit = {
for (offset <- 0 to 11) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class LogLoaderTest {
val wrapper = Mockito.spy(segment)
Mockito.doAnswer { in =>
segmentsWithReads += wrapper
segment.read(in.getArgument(0, classOf[java.lang.Long]), in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, classOf[java.lang.Long]), in.getArgument(3, classOf[java.lang.Boolean]))
segment.read(in.getArgument(0, classOf[java.lang.Long]), in.getArgument(1, classOf[java.lang.Integer]), in.getArgument(2, classOf[java.util.Optional[java.lang.Long]]), in.getArgument(3, classOf[java.lang.Boolean]))
}.when(wrapper).read(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())
Mockito.doAnswer { in =>
recoveredSegments += wrapper
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

import java.io.{File, RandomAccessFile}
import java.util
Expand Down Expand Up @@ -144,6 +144,34 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator, read.records.records.iterator)
}

@ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}")
@ValueSource(booleans = Array(true, false))
def testReadWhenNoMaxPosition(minOneMessage: Boolean): Unit = {
val maxPosition: Optional[java.lang.Long] = Optional.empty()
val maxSize = 1
val seg = createSegment(40)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
// read before first offset
var read = seg.read(48, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at first offset
read = seg.read(50, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at last offset
read = seg.read(51, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata)
assertTrue(read.records.records().iterator().asScala.isEmpty)
// read at log-end-offset
read = seg.read(52, maxSize, maxPosition, minOneMessage)
assertNull(read)
// read beyond log-end-offset
read = seg.read(53, maxSize, maxPosition, minOneMessage)
assertNull(read)
}

/**
* In a loop append two messages then truncate off the second of those messages and check that we can read
* the first but not the second message.
Expand Down Expand Up @@ -331,7 +359,7 @@ class LogSegmentTest {
writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty())
for (i <- 0 until 100) {
val records = seg.read(i, 1, seg.size(), true).records.records
val records = seg.read(i, 1, Optional.of(seg.size()), true).records.records
assertEquals(i, records.iterator.next().offset)
}
}
Expand Down
Loading

0 comments on commit b92dd52

Please sign in to comment.