Skip to content

Commit

Permalink
KAFKA-9807; Protect LSO reads from concurrent high-watermark updates (#…
Browse files Browse the repository at this point in the history
…8418)

If the high-watermark is updated in the middle of a read with the `read_committed` isolation level, it is possible to return data above the LSO. In the worst case, this can lead to the read of an aborted transaction. The root cause is that the logic depends on reading the high-watermark twice. We fix the problem by reading it once and caching the value.

Reviewers: David Arthur <[email protected]>, Guozhang Wang <[email protected]>, Ismael Juma <[email protected]>
  • Loading branch information
hachikuji authored Apr 3, 2020
1 parent 8595267 commit d4eb406
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ class Log(@volatile private var _dir: File,
private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed()

// cache the current high watermark to avoid a concurrent update invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata

firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark =>
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
Expand All @@ -432,7 +435,7 @@ class Log(@volatile private var _dir: File,
} else {
offsetMetadata
}
case _ => fetchHighWatermarkMetadata
case _ => highWatermarkMetadata
}
}

Expand Down
54 changes: 54 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.concurrent.{Callable, Executors}
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}

Expand Down Expand Up @@ -3649,6 +3650,59 @@ class LogTest {
assertEquals(None, log.firstUnstableOffset)
}

@Test
def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
val log = createLog(logDir, logConfig)
val lastOffset = 50L

val producerEpoch = 0.toShort
val producerId = 15L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)

// Thread 1 writes single-record transactions and attempts to read them
// before they have been aborted, and then aborts them
val txnWriteAndReadLoop: Callable[Int] = () => {
var nonEmptyReads = 0
while (log.logEndOffset < lastOffset) {
val currentLogEndOffset = log.logEndOffset

appendProducer(1)

val readInfo = log.read(
startOffset = currentLogEndOffset,
maxLength = Int.MaxValue,
isolation = FetchTxnCommitted,
minOneMessage = false)

if (readInfo.records.sizeInBytes() > 0)
nonEmptyReads += 1

appendEndTxnMarkerAsLeader(log, producerId, producerEpoch, ControlRecordType.ABORT)
}
nonEmptyReads
}

// Thread 2 watches the log and updates the high watermark
val hwUpdateLoop: Runnable = () => {
while (log.logEndOffset < lastOffset) {
log.updateHighWatermark(log.logEndOffset)
}
}

val executor = Executors.newFixedThreadPool(2)
try {
executor.submit(hwUpdateLoop)

val future = executor.submit(txnWriteAndReadLoop)
val nonEmptyReads = future.get()

assertEquals(0, nonEmptyReads)
} finally {
executor.shutdownNow()
}
}

@Test
def testTransactionIndexUpdated(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
Expand Down

0 comments on commit d4eb406

Please sign in to comment.