Skip to content


KAFKA-739 Handle null message payloads in messages and in the log cle…
Browse files Browse the repository at this point in the history
…aner. Reviewed by Jun and Neha.
  • Loading branch information
jkreps committed Mar 12, 2013
1 parent c1ed12e commit 9ff4e8e
Show file tree
Hide file tree
Showing 23 changed files with 504 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class SimpleKafkaETLMapper implements

protected Text getData(Message message) throws IOException {
ByteBuffer buf = message.payload();
if(buf == null)
return new Text();

byte[] array = new byte[buf.limit()];
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/consumer/ConsumerIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk

val keyBuffer = item.message.key
val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/CleanerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ package kafka.log
* @param hashAlgorithm The hash algorithm to use in key comparison.
case class CleanerConfig(val numThreads: Int = 1,
val dedupeBufferSize: Int = 4*1024*1024,
val dedupeBufferLoadFactor: Double = 0.75,
val dedupeBufferSize: Long = 4*1024*1024L,
val dedupeBufferLoadFactor: Double = 0.9d,
val ioBufferSize: Int = 1024*1024,
val maxMessageSize: Int = 32*1024*1024,
val maxIoBytesPerSecond: Double = Double.MaxValue,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/FileMessageSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
val sizeOffsetBuffer = ByteBuffer.allocate(12)

override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()

// read the size of the item
val sizeOffsetBuffer = ByteBuffer.allocate(12)
sizeOffsetBuffer.rewind(), location)
return allDone()
Expand Down
104 changes: 74 additions & 30 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio._
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import java.util.Date
import kafka.common._
import kafka.message._
Expand All @@ -39,8 +40,7 @@ import kafka.utils._
* The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy
* and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
* To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping
* is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of
* To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of
* the implementation of the mapping.
* Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a
Expand All @@ -53,6 +53,11 @@ import kafka.utils._
* One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
* Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner.
* The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
* basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
* Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
* @param config Configuration parameters for the cleaner
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
Expand All @@ -62,7 +67,7 @@ class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging {

/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap

Expand Down Expand Up @@ -160,12 +165,14 @@ class LogCleaner(val config: CleanerConfig,
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
private class CleanerThread extends Thread {
if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
val cleaner = new Cleaner(id = threadId.getAndIncrement(),
offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads,
maxLoadFactor = config.dedupeBufferLoadFactor,
offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
hashAlgorithm = config.hashAlgorithm),
ioBufferSize = config.ioBufferSize / config.numThreads / 2,
maxIoBufferSize = config.maxMessageSize,
dupBufferLoadFactor = config.dedupeBufferLoadFactor,
throttler = throttler,
time = time)

Expand Down Expand Up @@ -251,13 +258,20 @@ private[log] class Cleaner(val id: Int,
offsetMap: OffsetMap,
ioBufferSize: Int,
maxIoBufferSize: Int,
dupBufferLoadFactor: Double,
throttler: Throttler,
time: Time) extends Logging {

this.logIdent = "Cleaner " + id + ":"
this.logIdent = "Cleaner " + id + ": "

/* stats on this cleaning */
val stats = new CleanerStats(time)
private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O
private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O

/* buffer used for read i/o */
private var readBuffer = ByteBuffer.allocate(ioBufferSize)

/* buffer used for write i/o */
private var writeBuffer = ByteBuffer.allocate(ioBufferSize)

* Clean the given log
Expand All @@ -268,22 +282,29 @@ private[log] class Cleaner(val id: Int,
private[log] def clean(cleanable: LogToClean): Long = {
val topic = cleanable.topicPartition.topic
val part = cleanable.topicPartition.partition
info("Beginning cleaning of %s-%d.".format(topic, part))
info("Beginning cleaning of log %s.".format(
val log = cleanable.log
val truncateCount = log.numberOfTruncates

// build the offset map
val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity)
info("Building offset map for %s...".format(
val upperBoundOffset = log.activeSegment.baseOffset
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1

// group the segments and clean the groups
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
info("Cleaning segments %s for log %s...".format(","),
cleanSegments(log, group, offsetMap, truncateCount)
// figure out the timestamp below which it is safe to remove delete tombstones
// this position is defined to be a configurable time beneath the last modified time of the last clean segment
val deleteHorizonMs =
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - log.config.deleteRetentionMs

// group the segments and clean the groups
info("Cleaning log %s (discarding tombstones prior to %s)...".format(, new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs)

Expand All @@ -295,8 +316,13 @@ private[log] class Cleaner(val id: Int,
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
* @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) {
private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
expectedTruncateCount: Int,
deleteHorizonMs: Long) {
// create a new segment with the suffix .cleaned appended to both the log and index name
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
Expand All @@ -307,17 +333,25 @@ private[log] class Cleaner(val id: Int,
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)

// clean segments into the new destination segment
for (old <- segments)
cleanInto(old, cleaned, map)
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
.format(old.baseOffset,, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(old, cleaned, map, retainDeletes)

// trim excess index

// flush new segment to disk before swap

// update the modification date to retain the last modified date of the original files
val modified = segments.last.lastModified
cleaned.lastModified = modified

// swap in new segment
info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset,","),
info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset,","),
try {
log.replaceSegments(cleaned, segments, expectedTruncateCount)
} catch {
Expand All @@ -334,10 +368,11 @@ private[log] class Cleaner(val id: Int,
* @param source The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
* @param retainDeletes Should delete tombstones be retained while cleaning this segment
* TODO: Implement proper compression support
private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) {
private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
var position = 0
while (position < source.log.sizeInBytes) {
Expand All @@ -355,10 +390,14 @@ private[log] class Cleaner(val id: Int,
val key = entry.message.key
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
val lastOffset = map.get(key)
/* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */
val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
if (retainRecord) {
val foundOffset = map.get(key)
/* two cases in which we can get rid of a message:
* 1) if there exists a message with the same key but higher offset
* 2) if the message is a delete "tombstone" marker and enough time has passed
val redundant = foundOffset >= 0 && entry.offset < foundOffset
val obsoleteDelete = !retainDeletes && entry.message.isNull
if (!redundant && !obsoleteDelete) {
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
Expand Down Expand Up @@ -443,13 +482,18 @@ private[log] class Cleaner(val id: Int,
private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
val segments = log.logSegments(start, end)
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(, segments.size, start, end))
var offset = segments.head.baseOffset
val dirty = log.logSegments(start, end).toSeq
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(, dirty.size, start, end))

// Add all the dirty segments. We must take at least map.slots * load_factor,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var offset = dirty.head.baseOffset
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset,
for (segment <- segments) {
val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
for (segment <- dirty) {
offset = buildOffsetMap(segment, map)
if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
offset = buildOffsetMap(segment, map)
info("Offset map for log %s complete.".format(
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kafka.common._
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param dedupe Should old segments in this log be deleted or deduplicated?
Expand All @@ -45,6 +46,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
val maxIndexSize: Int = 1024*1024,
val indexInterval: Int = 4096,
val fileDeleteDelayMs: Long = 60*1000,
val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
val minCleanableRatio: Double = 0.5,
val dedupe: Boolean = false) {

Expand All @@ -60,6 +62,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
props.put(RententionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
Expand All @@ -78,6 +81,7 @@ object LogConfig {
val RententionMsProp = ""
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = ""
val FileDeleteDelayMsProp = ""
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
Expand All @@ -92,6 +96,7 @@ object LogConfig {

Expand All @@ -110,6 +115,7 @@ object LogConfig {
maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,12 @@ class LogSegment(val log: FileMessageSet,
* The last modified time of this log segment as a unix time stamp
def lastModified = log.file.lastModified

* Change the last modified time for this log segment
def lastModified_=(ms: Long) = {

0 comments on commit 9ff4e8e

Please sign in to comment.