From 9ff4e8eb10e0ddd86f257e99d55971a132426605 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Tue, 12 Mar 2013 11:17:12 -0700 Subject: [PATCH] KAFKA-739 Handle null message payloads in messages and in the log cleaner. Reviewed by Jun and Neha. --- .../kafka/etl/impl/SimpleKafkaETLMapper.java | 2 + .../kafka/consumer/ConsumerIterator.scala | 2 +- .../main/scala/kafka/log/CleanerConfig.scala | 4 +- .../main/scala/kafka/log/FileMessageSet.scala | 3 +- .../src/main/scala/kafka/log/LogCleaner.scala | 104 ++++++--- core/src/main/scala/kafka/log/LogConfig.scala | 6 + .../src/main/scala/kafka/log/LogSegment.scala | 9 +- core/src/main/scala/kafka/log/OffsetMap.scala | 103 ++++++--- .../main/scala/kafka/message/Message.scala | 16 +- .../producer/async/DefaultEventHandler.scala | 5 +- .../main/scala/kafka/server/KafkaConfig.scala | 14 +- .../main/scala/kafka/server/KafkaServer.scala | 2 + .../scala/kafka/tools/DumpLogSegments.scala | 5 +- .../kafka/tools/SimpleConsumerShell.scala | 2 +- .../scala/kafka/utils/IteratorTemplate.scala | 17 +- .../kafka/utils/VerifiableProperties.scala | 6 +- .../scala/other/kafka/TestLogCleaning.scala | 198 +++++++++++++----- .../scala/unit/kafka/log/CleanerTest.scala | 66 +++--- .../test/scala/unit/kafka/log/LogTest.scala | 13 ++ .../scala/unit/kafka/log/OffsetMapTest.scala | 43 ++-- .../unit/kafka/message/MessageTest.scala | 9 +- .../unit/kafka/producer/ProducerTest.scala | 25 ++- .../kafka/utils/IteratorTemplateTest.scala | 41 ++++ 23 files changed, 504 insertions(+), 191 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java index b0aadff332d6..51b6adf3e125 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java @@ -41,6 +41,8 @@ public class SimpleKafkaETLMapper implements protected Text getData(Message message) throws IOException { ByteBuffer buf = message.payload(); + if(buf == null) + return new Text(); byte[] array = new byte[buf.limit()]; buf.get(array); diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 746a4bd4eaaa..963a3a9d1e8d 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -108,7 +108,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk val keyBuffer = item.message.key val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) - val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) + val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) } diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index 999fee868c17..fa946ada9260 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -30,8 +30,8 @@ package kafka.log * @param hashAlgorithm The hash algorithm to use in key comparison. */ case class CleanerConfig(val numThreads: Int = 1, - val dedupeBufferSize: Int = 4*1024*1024, - val dedupeBufferLoadFactor: Double = 0.75, + val dedupeBufferSize: Long = 4*1024*1024L, + val dedupeBufferLoadFactor: Double = 0.9d, val ioBufferSize: Int = 1024*1024, val maxMessageSize: Int = 32*1024*1024, val maxIoBytesPerSecond: Double = Double.MaxValue, diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 0eef33e20f37..abb160cdafad 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -159,13 +159,14 @@ class FileMessageSet private[kafka](@volatile var file: File, def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start + val sizeOffsetBuffer = ByteBuffer.allocate(12) override def makeNext(): MessageAndOffset = { if(location >= end) return allDone() // read the size of the item - val sizeOffsetBuffer = ByteBuffer.allocate(12) + sizeOffsetBuffer.rewind() channel.read(sizeOffsetBuffer, location) if(sizeOffsetBuffer.hasRemaining) return allDone() diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 368a12be1f95..ccde2abd99d2 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -23,6 +23,7 @@ import java.nio._ import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ +import java.util.Date import java.io.File import kafka.common._ import kafka.message._ @@ -39,8 +40,7 @@ import kafka.utils._ * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. * - * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping - * is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of + * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of * the implementation of the mapping. * * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a @@ -53,6 +53,11 @@ import kafka.utils._ * * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. * + * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. + * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic + * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). + * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. + * * @param config Configuration parameters for the cleaner * @param logDirs The directories where offset checkpoints reside * @param logs The pool of logs @@ -62,7 +67,7 @@ class LogCleaner(val config: CleanerConfig, val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging { - + /* the offset checkpoints holding the last cleaned point for each log */ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap @@ -160,12 +165,14 @@ class LogCleaner(val config: CleanerConfig, * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. */ private class CleanerThread extends Thread { + if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) + warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") val cleaner = new Cleaner(id = threadId.getAndIncrement(), - offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads, - maxLoadFactor = config.dedupeBufferLoadFactor, + offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm), ioBufferSize = config.ioBufferSize / config.numThreads / 2, maxIoBufferSize = config.maxMessageSize, + dupBufferLoadFactor = config.dedupeBufferLoadFactor, throttler = throttler, time = time) @@ -251,13 +258,20 @@ private[log] class Cleaner(val id: Int, offsetMap: OffsetMap, ioBufferSize: Int, maxIoBufferSize: Int, + dupBufferLoadFactor: Double, throttler: Throttler, time: Time) extends Logging { - this.logIdent = "Cleaner " + id + ":" + this.logIdent = "Cleaner " + id + ": " + + /* stats on this cleaning */ val stats = new CleanerStats(time) - private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O - private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O + + /* buffer used for read i/o */ + private var readBuffer = ByteBuffer.allocate(ioBufferSize) + + /* buffer used for write i/o */ + private var writeBuffer = ByteBuffer.allocate(ioBufferSize) /** * Clean the given log @@ -268,22 +282,29 @@ private[log] class Cleaner(val id: Int, */ private[log] def clean(cleanable: LogToClean): Long = { stats.clear() - val topic = cleanable.topicPartition.topic - val part = cleanable.topicPartition.partition - info("Beginning cleaning of %s-%d.".format(topic, part)) + info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log val truncateCount = log.numberOfTruncates // build the offset map - val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity) + info("Building offset map for %s...".format(cleanable.log.name)) + val upperBoundOffset = log.activeSegment.baseOffset val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1 stats.indexDone() - // group the segments and clean the groups - for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) { - info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name)) - cleanSegments(log, group, offsetMap, truncateCount) + // figure out the timestamp below which it is safe to remove delete tombstones + // this position is defined to be a configurable time beneath the last modified time of the last clean segment + val deleteHorizonMs = + log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { + case None => 0L + case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } + + // group the segments and clean the groups + info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs))) + for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) + cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs) + stats.allDone() endOffset } @@ -295,8 +316,13 @@ private[log] class Cleaner(val id: Int, * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet + * @param deleteHorizonMs The time to retain delete tombstones */ - private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) { + private[log] def cleanSegments(log: Log, + segments: Seq[LogSegment], + map: OffsetMap, + expectedTruncateCount: Int, + deleteHorizonMs: Long) { // create a new segment with the suffix .cleaned appended to both the log and index name val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix) logFile.delete() @@ -307,17 +333,25 @@ private[log] class Cleaner(val id: Int, val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time) // clean segments into the new destination segment - for (old <- segments) - cleanInto(old, cleaned, map) + for (old <- segments) { + val retainDeletes = old.lastModified > deleteHorizonMs + info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." + .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) + cleanInto(old, cleaned, map, retainDeletes) + } // trim excess index index.trimToValidSize() // flush new segment to disk before swap cleaned.flush() + + // update the modification date to retain the last modified date of the original files + val modified = segments.last.lastModified + cleaned.lastModified = modified // swap in new segment - info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) + info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) try { log.replaceSegments(cleaned, segments, expectedTruncateCount) } catch { @@ -334,10 +368,11 @@ private[log] class Cleaner(val id: Int, * @param source The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping + * @param retainDeletes Should delete tombstones be retained while cleaning this segment * * TODO: Implement proper compression support */ - private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) { + private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { var position = 0 while (position < source.log.sizeInBytes) { checkDone() @@ -355,10 +390,14 @@ private[log] class Cleaner(val id: Int, stats.readMessage(size) val key = entry.message.key require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath)) - val lastOffset = map.get(key) - /* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */ - val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null) - if (retainRecord) { + val foundOffset = map.get(key) + /* two cases in which we can get rid of a message: + * 1) if there exists a message with the same key but higher offset + * 2) if the message is a delete "tombstone" marker and enough time has passed + */ + val redundant = foundOffset >= 0 && entry.offset < foundOffset + val obsoleteDelete = !retainDeletes && entry.message.isNull + if (!redundant && !obsoleteDelete) { ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) stats.recopyMessage(size) } @@ -443,13 +482,18 @@ private[log] class Cleaner(val id: Int, */ private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = { map.clear() - val segments = log.logSegments(start, end) - info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end)) - var offset = segments.head.baseOffset + val dirty = log.logSegments(start, end).toSeq + info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) + + // Add all the dirty segments. We must take at least map.slots * load_factor, + // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) + var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) - for (segment <- segments) { + val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong + for (segment <- dirty) { checkDone() - offset = buildOffsetMap(segment, map) + if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor) + offset = buildOffsetMap(segment, map) } info("Offset map for log %s complete.".format(log.name)) offset diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index dc42a74774f7..48660bcd022c 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -32,6 +32,7 @@ import kafka.common._ * @param maxIndexSize The maximum size of an index file * @param indexInterval The approximate number of bytes between index entries * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem + * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param dedupe Should old segments in this log be deleted or deduplicated? */ @@ -45,6 +46,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val maxIndexSize: Int = 1024*1024, val indexInterval: Int = 4096, val fileDeleteDelayMs: Long = 60*1000, + val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, val minCleanableRatio: Double = 0.5, val dedupe: Boolean = false) { @@ -60,6 +62,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, props.put(RententionMsProp, retentionMs.toString) props.put(MaxMessageBytesProp, maxMessageSize.toString) props.put(IndexIntervalBytesProp, indexInterval.toString) + props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") @@ -78,6 +81,7 @@ object LogConfig { val RententionMsProp = "retention.ms" val MaxMessageBytesProp = "max.message.bytes" val IndexIntervalBytesProp = "index.interval.bytes" + val DeleteRetentionMsProp = "delete.retention.ms" val FileDeleteDelayMsProp = "file.delete.delay.ms" val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" @@ -92,6 +96,7 @@ object LogConfig { MaxMessageBytesProp, IndexIntervalBytesProp, FileDeleteDelayMsProp, + DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp) @@ -110,6 +115,7 @@ object LogConfig { maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, + deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 120ebeb7f37a..30d2e91daf82 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -249,5 +249,12 @@ class LogSegment(val log: FileMessageSet, * The last modified time of this log segment as a unix time stamp */ def lastModified = log.file.lastModified - + + /** + * Change the last modified time for this log segment + */ + def lastModified_=(ms: Long) = { + log.file.setLastModified(ms) + index.file.setLastModified(ms) + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 623681361e42..42cdfbb6100b 100644 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -23,22 +23,22 @@ import java.nio.ByteBuffer import kafka.utils._ trait OffsetMap { - def capacity: Int + def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int - def utilization: Double = size.toDouble / capacity + def utilization: Double = size.toDouble / slots } /** - * An approximate map used for deduplicating the log. + * An hash table used for deduplicating the log. This hash table uses a cryptographicly secure hash of the key as a proxy for the key + * for comparisons and to save space on object overhead. Collisions are resolved by probing. This hash table does not support deletes. * @param memory The amount of memory this map can use - * @param maxLoadFactor The maximum percent full this offset map can be * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512 */ @nonthreadsafe -class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap { +class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap { private val bytes = ByteBuffer.allocate(memory) /* the hash algorithm instance to use, defualt is MD5 */ @@ -54,8 +54,11 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori /* number of entries put into the map */ private var entries = 0 - /* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */ - private var salt: Byte = 0 + /* number of lookups on the map */ + private var lookups = 0L + + /* the number of probes for all lookups */ + private var probes = 0L /** * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) @@ -63,40 +66,66 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori val bytesPerEntry = hashSize + 8 /** - * The maximum number of entries this map can contain before it exceeds the max load factor + * The maximum number of entries this map can contain */ - override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt + val slots: Int = (memory / bytesPerEntry).toInt /** - * Associate a offset with a key. + * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long) { - if(size + 1 > capacity) - throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity)) - hash(key, hash1) - bytes.position(offsetFor(hash1)) + require(entries < slots, "Attempt to add a new entry to a full offset map.") + lookups += 1 + hashInto(key, hash1) + // probe until we find the first empty slot + var attempt = 0 + var pos = positionOf(hash1, attempt) + while(!isEmpty(pos)) { + bytes.position(pos) + bytes.get(hash2) + if(Arrays.equals(hash1, hash2)) { + // we found an existing entry, overwrite it and return (size does not change) + bytes.putLong(offset) + return + } + attempt += 1 + pos = positionOf(hash1, attempt) + } + // found an empty slot, update it--size grows by 1 + bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) entries += 1 } /** - * Get the offset associated with this key. This method is approximate, - * it may not find an offset previously stored, but cannot give a wrong offset. + * Check that there is no entry at the given position + */ + private def isEmpty(position: Int): Boolean = + bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0 + + /** + * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { - hash(key, hash1) - bytes.position(offsetFor(hash1)) - bytes.get(hash2) - // if the computed hash equals the stored hash return the stored offset - if(Arrays.equals(hash1, hash2)) - bytes.getLong() - else - -1L + lookups += 1 + hashInto(key, hash1) + // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot + var attempt = 0 + var pos = 0 + do { + pos = positionOf(hash1, attempt) + bytes.position(pos) + if(isEmpty(pos)) + return -1L + bytes.get(hash2) + attempt += 1 + } while(!Arrays.equals(hash1, hash2)) + bytes.getLong() } /** @@ -105,7 +134,8 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori */ override def clear() { this.entries = 0 - this.salt = (this.salt + 1).toByte + this.lookups = 0L + this.probes = 0L Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte) } @@ -115,19 +145,32 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori override def size: Int = entries /** - * Choose a slot in the array for this hash + * The rate of collisions in the lookups */ - private def offsetFor(hash: Array[Byte]): Int = - bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity) + def collisionRate: Double = + (this.probes - this.lookups) / this.lookups.toDouble + + /** + * Calculate the ith probe position. We first try reading successive integers from the hash itself + * then if all of those fail we degrade to linear probing. + * @param hash The hash of the key to find the position for + * @param attempt The ith probe + * @return The byte offset in the buffer at which the ith probing for the given hash would reside + */ + private def positionOf(hash: Array[Byte], attempt: Int): Int = { + val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4) + val slot = Utils.abs(probe) % slots + this.probes += 1 + slot * bytesPerEntry + } /** * The offset at which we have stored the given key * @param key The key to hash * @param buffer The buffer to store the hash into */ - private def hash(key: ByteBuffer, buffer: Array[Byte]) { + private def hashInto(key: ByteBuffer, buffer: Array[Byte]) { key.mark() - digest.update(salt) digest.update(key) key.reset() digest.digest(buffer, 0, hashSize) diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 12a83681dbd5..1a43fdfc37c7 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -101,7 +101,9 @@ class Message(val buffer: ByteBuffer) { Message.KeySizeLength + (if(key == null) 0 else key.length) + Message.ValueSizeLength + - (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset))) + (if(bytes == null) 0 + else if(payloadSize >= 0) payloadSize + else bytes.length - payloadOffset))) // skip crc, we will fill that in at the end buffer.position(MagicOffset) buffer.put(CurrentMagicValue) @@ -115,9 +117,12 @@ class Message(val buffer: ByteBuffer) { buffer.putInt(key.length) buffer.put(key, 0, key.length) } - val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset + val size = if(bytes == null) -1 + else if(payloadSize >= 0) payloadSize + else bytes.length - payloadOffset buffer.putInt(size) - buffer.put(bytes, payloadOffset, size) + if(bytes != null) + buffer.put(bytes, payloadOffset, size) buffer.rewind() // now compute the checksum and fill it in @@ -185,6 +190,11 @@ class Message(val buffer: ByteBuffer) { */ def payloadSize: Int = buffer.getInt(payloadSizeOffset) + /** + * Is the payload of this message null + */ + def isNull(): Boolean = payloadSize < 0 + /** * The magic version of this message */ diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 27b16e323964..2e3e38366ee1 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -249,7 +249,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if (logger.isTraceEnabled) { val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload))))) } failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq .map(partitionStatus => partitionStatus._1) @@ -257,8 +257,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) failedTopicPartitions - } else + } else { Seq.empty[TopicAndPartition] + } } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s" diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 006caa7db8e2..5e4c9cadb152 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -109,9 +109,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") - /* a per-topic override for the cleanup policy for segments beyond the retention window */ - val logCleanupPolicyMap = props.getMap("topic.log.cleanup.policy") - /* the number of background threads to use for log cleaning */ val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue)) @@ -119,11 +116,15 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue) /* the total memory used for log deduplication across all cleaner threads */ - val logCleanerDedupeBufferSize = props.getIntInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024, (0, Int.MaxValue)) + val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024L, (0, Long.MaxValue)) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") /* the total memory used for log cleaner I/O buffers across all cleaner threads */ - val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 4*1024*1024, (0, Int.MaxValue)) + val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024, (0, Int.MaxValue)) + + /* log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value + * will allow more log to be cleaned at once but will lead to more hash collisions */ + val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d) /* the amount of time to sleep when there are no logs to clean */ val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue)) @@ -134,6 +135,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* should we enable log cleaning? */ val logCleanerEnable = props.getBoolean("log.cleaner.enable", false) + /* how long are delete records retained? */ + val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 * 60 * 60 * 1000L) + /* the maximum size in bytes of the offset index */ val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9fa432d67058..e2f4e915fb59 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -163,6 +163,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg maxMessageSize = config.messageMaxBytes, maxIndexSize = config.logIndexSizeMaxBytes, indexInterval = config.logIndexIntervalBytes, + deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") @@ -171,6 +172,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, + dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, ioBufferSize = config.logCleanerIoBufferSize, maxMessageSize = config.messageMaxBytes, maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 31333e7d7f61..d9546cafc314 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -140,8 +140,9 @@ object DumpLogSegments { print(" keysize: " + msg.keySize) if(printContents) { if(msg.hasKey) - print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) - print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8")) + val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8") + print(" payload: " + payload) } println() } diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index d8127a8f2e4a..aa5e661acd65 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -196,7 +196,7 @@ object SimpleConsumerShell extends Logging { System.out.println("next offset = " + offset) val message = messageAndOffset.message val key = if(message.hasKey) Utils.readBytes(message.key) else null - formatter.writeTo(key, Utils.readBytes(message.payload), System.out) + formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) } catch { case e => if (skipMessageOnError) diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala index 301f9346aff8..fd952f3ec0f0 100644 --- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala +++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala @@ -32,16 +32,21 @@ object FAILED extends State abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] { private var state: State = NOT_READY - private var nextItem: Option[T] = None + private var nextItem = null.asInstanceOf[T] def next(): T = { if(!hasNext()) throw new NoSuchElementException() state = NOT_READY - nextItem match { - case Some(item) => item - case None => throw new IllegalStateException("Expected item but none found.") - } + if(nextItem == null) + throw new IllegalStateException("Expected item but none found.") + nextItem + } + + def peek(): T = { + if(!hasNext()) + throw new NoSuchElementException() + nextItem } def hasNext(): Boolean = { @@ -58,7 +63,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T def maybeComputeNext(): Boolean = { state = FAILED - nextItem = Some(makeNext()) + nextItem = makeNext() if(state == DONE) { false } else { diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index a2ac55ccee9e..9009a9d82cc6 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -18,6 +18,7 @@ package kafka.utils import java.util.Properties +import java.util.Collections import scala.collection._ class VerifiableProperties(val props: Properties) extends Logging { @@ -194,9 +195,8 @@ class VerifiableProperties(val props: Properties) extends Logging { def verify() { info("Verifying properties") - val specifiedProperties = props.propertyNames() - while (specifiedProperties.hasMoreElements) { - val key = specifiedProperties.nextElement().asInstanceOf[String] + val propNames = JavaConversions.asBuffer(Collections.list(props.propertyNames)).map(_.toString).sorted + for(key <- propNames) { if (!referenceSet.contains(key)) warn("Property %s is not valid".format(key)) else diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index d9c721ba3eae..0bef218230a7 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -27,6 +27,8 @@ import kafka.producer._ import kafka.consumer._ import kafka.serializer._ import kafka.utils._ +import kafka.log.FileMessageSet +import kafka.log.Log /** * This is a torture test that runs against an existing broker. Here is how it works: @@ -66,6 +68,11 @@ object TestLogCleaning { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) + val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.") + .withRequiredArg + .describedAs("percent") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) val zkConnectOpt = parser.accepts("zk", "Zk url.") .withRequiredArg .describedAs("url") @@ -75,10 +82,18 @@ object TestLogCleaning { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) - val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.") + val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.") + .withRequiredArg + .describedAs("directory") + .ofType(classOf[String]) val options = parser.parse(args:_*) + if(options.has(dumpOpt)) { + dumpLog(new File(options.valueOf(dumpOpt))) + System.exit(0) + } + if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) { parser.printHelpOn(System.err) System.exit(1) @@ -86,83 +101,146 @@ object TestLogCleaning { // parse options val messages = options.valueOf(numMessagesOpt).longValue + val percentDeletes = options.valueOf(percentDeletesOpt).intValue val dups = options.valueOf(numDupsOpt).intValue val brokerUrl = options.valueOf(brokerOpt) val topicCount = options.valueOf(topicsOpt).intValue val zkUrl = options.valueOf(zkConnectOpt) val sleepSecs = options.valueOf(sleepSecsOpt).intValue - val cleanup = options.has(cleanupOpt) val testId = new Random().nextInt(Int.MaxValue) val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray println("Producing %d messages...".format(messages)) - val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup) + val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes) println("Sleeping for %d seconds...".format(sleepSecs)) Thread.sleep(sleepSecs * 1000) println("Consuming messages...") - val consumedDataFile = consumeMessages(zkUrl, topics, cleanup) + val consumedDataFile = consumeMessages(zkUrl, topics) val producedLines = lineCount(producedDataFile) val consumedLines = lineCount(consumedDataFile) val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction)) - println("Validating output files...") - validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile)) - println("All done.") + println("Deduplicating and validating output files...") + validateOutput(producedDataFile, consumedDataFile) + producedDataFile.delete() + consumedDataFile.delete() + } + + def dumpLog(dir: File) { + require(dir.exists, "Non-existant directory: " + dir.getAbsolutePath) + for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) { + val ms = new FileMessageSet(new File(dir, file)) + for(entry <- ms) { + val key = Utils.readString(entry.message.key) + val content = + if(entry.message.isNull) + null + else + Utils.readString(entry.message.payload) + println("offset = %s, key = %s, content = %s".format(entry.offset, key, content)) + } + } } def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size - def validateOutput(produced: BufferedReader, consumed: BufferedReader) { - while(true) { - val prod = readFinalValue(produced) - val cons = readFinalValue(consumed) - if(prod == null && cons == null) { - return - } else if(prod != cons) { - System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons)) - System.exit(1) - } + def validateOutput(producedDataFile: File, consumedDataFile: File) { + val producedReader = externalSort(producedDataFile) + val consumedReader = externalSort(consumedDataFile) + val produced = valuesIterator(producedReader) + val consumed = valuesIterator(consumedReader) + val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped") + val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024) + val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped") + val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024) + var total = 0 + var mismatched = 0 + while(produced.hasNext && consumed.hasNext) { + val p = produced.next() + producedDeduped.write(p.toString) + producedDeduped.newLine() + val c = consumed.next() + consumedDeduped.write(c.toString) + consumedDeduped.newLine() + if(p != c) + mismatched += 1 + total += 1 } + producedDeduped.close() + consumedDeduped.close() + require(!produced.hasNext, "Additional values produced not found in consumer log.") + require(!consumed.hasNext, "Additional values consumed not found in producer log.") + println("Validated " + total + " values, " + mismatched + " mismatches.") + require(mismatched == 0, "Non-zero number of row mismatches.") + // if all the checks worked out we can delete the deduped files + producedDedupedFile.delete() + consumedDedupedFile.delete() } - def readFinalValue(reader: BufferedReader): (String, Int, Int) = { - def readTuple() = { - val line = reader.readLine - if(line == null) - null - else - line.split("\t") + def valuesIterator(reader: BufferedReader) = { + new IteratorTemplate[TestRecord] { + def makeNext(): TestRecord = { + var next = readNext(reader) + while(next != null && next.delete) + next = readNext(reader) + if(next == null) + allDone() + else + next + } } - var prev = readTuple() - if(prev == null) + } + + def readNext(reader: BufferedReader): TestRecord = { + var line = reader.readLine() + if(line == null) return null + var curr = new TestRecord(line) while(true) { - reader.mark(1024) - val curr = readTuple() - if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) { - reader.reset() - return (prev(0), prev(1).toInt, prev(2).toInt) - } else { - prev = curr - } + line = peekLine(reader) + if(line == null) + return curr + val next = new TestRecord(line) + if(next == null || next.topicAndKey != curr.topicAndKey) + return curr + curr = next + reader.readLine() } - return null + null + } + + def peekLine(reader: BufferedReader) = { + reader.mark(4096) + val line = reader.readLine + reader.reset() + line } def externalSort(file: File): BufferedReader = { - val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath) + val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath) val process = builder.start() - new BufferedReader(new InputStreamReader(process.getInputStream())) + new Thread() { + override def run() { + val exitCode = process.waitFor() + if(exitCode != 0) { + System.err.println("Process exited abnormally.") + while(process.getErrorStream.available > 0) { + System.err.write(process.getErrorStream().read()) + } + } + } + }.start() + new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024) } def produceMessages(brokerUrl: String, topics: Array[String], messages: Long, - dups: Int, - cleanup: Boolean): File = { + dups: Int, + percentDeletes: Int): File = { val producerProps = new Properties producerProps.setProperty("producer.type", "async") producerProps.setProperty("broker.list", brokerUrl) @@ -174,36 +252,49 @@ object TestLogCleaning { val rand = new Random(1) val keyCount = (messages / dups).toInt val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt") - if(cleanup) - producedFile.deleteOnExit() + println("Logging produce requests to " + producedFile.getAbsolutePath) val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024) for(i <- 0L until (messages * topics.length)) { val topic = topics((i % topics.length).toInt) val key = rand.nextInt(keyCount) - producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString)) - producedWriter.write("%s\t%s\t%s\n".format(topic, key, i)) + val delete = i % 100 < percentDeletes + val msg = + if(delete) + KeyedMessage[String, String](topic = topic, key = key.toString, message = null) + else + KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString) + producer.send(msg) + producedWriter.write(TestRecord(topic, key, i, delete).toString) + producedWriter.newLine() } producedWriter.close() producer.close() producedFile } - def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = { + def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { val consumerProps = new Properties consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) consumerProps.setProperty("zk.connect", zkUrl) - consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString) - val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) + consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString) + new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) + } + + def consumeMessages(zkUrl: String, topics: Array[String]): File = { + val connector = makeConsumer(zkUrl, topics) val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder) val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt") - if(cleanup) - consumedFile.deleteOnExit() + println("Logging consumed messages to " + consumedFile.getAbsolutePath) val consumedWriter = new BufferedWriter(new FileWriter(consumedFile)) for(topic <- topics) { val stream = streams(topic).head try { - for(item <- stream) - consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message)) + for(item <- stream) { + val delete = item.message == null + val value = if(delete) -1L else item.message.toLong + consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString) + consumedWriter.newLine() + } } catch { case e: ConsumerTimeoutException => } @@ -213,4 +304,11 @@ object TestLogCleaning { consumedFile } +} + +case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) { + def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d") + def this(line: String) = this(line.split("\t")) + override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u") + def topicAndKey = topic + key } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index cce2319d23b4..4619d86fc93e 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -52,7 +52,7 @@ class CleanerTest extends JUnitSuite { // append messages to the log until we have four segments while(log.numberOfSegments < 4) - log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) val keysFound = keysInLog(log) assertEquals((0L until log.logEndOffset), keysFound) @@ -62,14 +62,38 @@ class CleanerTest extends JUnitSuite { keys.foreach(k => map.put(key(k), Long.MaxValue)) // clean the log - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0) + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L) val shouldRemain = keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, keysInLog(log)) } + @Test + def testCleaningWithDeletes() { + val cleaner = makeCleaner(Int.MaxValue) + val log = makeLog(config = logConfig.copy(segmentSize = 1024)) + + // append messages with the keys 0 through N + while(log.numberOfSegments < 2) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + // delete all even keys between 0 and N + val leo = log.logEndOffset + for(key <- 0 until leo.toInt by 2) + log.append(deleteMessage(key)) + + // append some new unique keys to pad out to a new active segment + while(log.numberOfSegments < 4) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) + + cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0)) + val keys = keysInLog(log).toSet + assertTrue("None of the keys we deleted should still exist.", + (0 until leo.toInt by 2).forall(!keys.contains(_))) + } + /* extract all the keys from a log */ def keysInLog(log: Log): Iterable[Int] = - log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt)) + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt)) /** @@ -82,14 +106,14 @@ class CleanerTest extends JUnitSuite { // append messages to the log until we have four segments while(log.numberOfSegments < 2) - log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt)) + log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) log.truncateTo(log.logEndOffset-2) val keys = keysInLog(log) val map = new FakeOffsetMap(Int.MaxValue) keys.foreach(k => map.put(key(k), Long.MaxValue)) intercept[OptimisticLockFailureException] { - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0) + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L) } } @@ -170,40 +194,34 @@ class CleanerTest extends JUnitSuite { checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt) } - /** - * Test that we don't exceed the maximum capacity of the offset map, that is that an offset map - * with a max size of 1000 will only clean 1000 new entries even if more than that are available. - */ - @Test - def testBuildOffsetMapOverCapacity() { - val map = new FakeOffsetMap(1000) - val log = makeLog() - val cleaner = makeCleaner(Int.MaxValue) - val vals = 0 until 1001 - val offsets = writeToLog(log, vals zip vals) - val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map) - assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset) - } - def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time) def makeCleaner(capacity: Int) = - new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time) + new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(capacity), + ioBufferSize = 64*1024, + maxIoBufferSize = 64*1024, + dupBufferLoadFactor = 0.75, + throttler = throttler, + time = time) def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq) - yield log.append(messages(key, value)).firstOffset + yield log.append(message(key, value)).firstOffset } def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) - def messages(key: Int, value: Int) = + def message(key: Int, value: Int) = new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes)) + def deleteMessage(key: Int) = + new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null)) + } -class FakeOffsetMap(val capacity: Int) extends OffsetMap { +class FakeOffsetMap(val slots: Int) extends OffsetMap { val map = new java.util.HashMap[String, Long]() private def keyFor(key: ByteBuffer) = diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 23e0e6562dc3..5658ed4b62b2 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -569,4 +569,17 @@ class LogTest extends JUnitSuite { assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } + @Test + def testAppendMessageWithNullPayload() { + var log = new Log(logDir, + LogConfig(), + needsRecovery = false, + time.scheduler, + time) + log.append(new ByteBufferMessageSet(new Message(bytes = null))) + val ms = log.read(0, 4096, None) + assertEquals(0, ms.head.offset) + assertTrue("Message payload should be null.", ms.head.message.isNull) + } + } diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala index 99a0c4bfefb2..12ce39e665af 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala @@ -29,11 +29,12 @@ class OffsetMapTest extends JUnitSuite { validateMap(10) validateMap(100) validateMap(1000) + validateMap(5000) } @Test def testClear() { - val map = new SkimpyOffsetMap(4000, 0.75) + val map = new SkimpyOffsetMap(4000) for(i <- 0 until 10) map.put(key(i), i) for(i <- 0 until 10) @@ -43,45 +44,33 @@ class OffsetMapTest extends JUnitSuite { assertEquals(map.get(key(i)), -1L) } - @Test - def testCapacity() { - val map = new SkimpyOffsetMap(1024, 0.75) - var i = 0 - while(map.size < map.capacity) { - map.put(key(i), i) - i += 1 - } - // now the map is full, it should throw an exception - intercept[IllegalStateException] { - map.put(key(i), i) - } - } - def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes) - def validateMap(items: Int) { - val map = new SkimpyOffsetMap(items * 2 * 24, 0.75) + def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = { + val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt) for(i <- 0 until items) map.put(key(i), i) var misses = 0 - for(i <- 0 until items) { - map.get(key(i)) match { - case -1L => misses += 1 - case offset => assertEquals(i.toLong, offset) - } - } - println("Miss rate: " + (misses.toDouble / items)) + for(i <- 0 until items) + assertEquals(map.get(key(i)), i.toLong) + map } } object OffsetMapTest { def main(args: Array[String]) { - if(args.length != 1) { - System.err.println("USAGE: java OffsetMapTest size") + if(args.length != 2) { + System.err.println("USAGE: java OffsetMapTest size load") System.exit(1) } val test = new OffsetMapTest() - test.validateMap(args(0).toInt) + val size = args(0).toInt + val load = args(1).toDouble + val start = System.nanoTime + val map = test.validateMap(size, load) + val ellapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0 + println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms") + println("Collision rate: %.1f%%".format(100*map.collisionRate)) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 883442de563c..4837585d0353 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -38,7 +38,7 @@ class MessageTest extends JUnitSuite { @Before def setUp(): Unit = { val keys = Array(null, "key".getBytes, "".getBytes) - val vals = Array("value".getBytes, "".getBytes) + val vals = Array("value".getBytes, "".getBytes, null) val codecs = Array(NoCompressionCodec, GZIPCompressionCodec) for(k <- keys; v <- vals; codec <- codecs) messages += new MessageTestVal(k, v, codec, new Message(v, k, codec)) @@ -47,7 +47,12 @@ class MessageTest extends JUnitSuite { @Test def testFieldValues { for(v <- messages) { - TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload) + if(v.payload == null) { + assertTrue(v.message.isNull) + assertEquals("Payload should be null", null, v.message.payload) + } else { + TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload) + } assertEquals(Message.CurrentMagicValue, v.message.magic) if(v.message.hasKey) TestUtils.checkEquals(ByteBuffer.wrap(v.key), v.message.key) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index e4b057e00fcc..507e6a8e5758 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -288,7 +288,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ case e: FailedToSendMessageException => /* success */ case e: Exception => fail("Not expected", e) } finally { - producer.close + producer.close() } val t2 = SystemTime.milliseconds @@ -296,5 +296,28 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // we do this because the DefaultEventHandler retries a number of times assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries) } + + @Test + def testSendNullMessage() { + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("partitioner.class", "kafka.utils.StaticPartitioner") + props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + try { + + // create topic + AdminUtils.createTopic(zkClient, "new-topic", 2, 1) + assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => + AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) + + producer.send(new KeyedMessage[String, String]("new-topic", "key", null)) + } finally { + producer.close() + } + } } diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala new file mode 100644 index 000000000000..02054c61b729 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -0,0 +1,41 @@ +package kafka.utils + +import junit.framework.Assert._ +import org.scalatest.Assertions +import org.junit.{Test, After, Before} + +class IteratorTemplateTest extends Assertions { + + val lst = (0 until 10).toSeq + val iterator = new IteratorTemplate[Int]() { + var i = 0 + override def makeNext() = { + if(i >= lst.size) { + allDone() + } else { + val item = lst(i) + i += 1 + item + } + } + } + + @Test + def testIterator() { + for(i <- 0 until 10) { + assertEquals("We should have an item to read.", true, iterator.hasNext) + assertEquals("Checking again shouldn't change anything.", true, iterator.hasNext) + assertEquals("Peeking at the item should show the right thing.", i, iterator.peek) + assertEquals("Peeking again shouldn't change anything", i, iterator.peek) + assertEquals("Getting the item should give the right thing.", i, iterator.next) + } + assertEquals("All gone!", false, iterator.hasNext) + intercept[NoSuchElementException] { + iterator.peek + } + intercept[NoSuchElementException] { + iterator.next + } + } + +} \ No newline at end of file