diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 094473a8e2668..bc328d77efc86 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i } } +/** + * A class used to hold params required to decide to rotate a log segment or not. + */ +case class RollParams(maxSegmentMs: Long, + maxSegmentBytes: Int, + maxTimestampInMessages: Long, + maxOffsetInMessages: Long, + messagesSize: Int, + now: Long) + +object RollParams { + def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { + new RollParams(config.segmentMs, + config.segmentSize, + appendInfo.maxTimestamp, + appendInfo.lastOffset, + messagesSize, now) + } +} + /** * An append-only log for storing messages. * @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File, val maxTimestampInMessages = appendInfo.maxTimestamp val maxOffsetInMessages = appendInfo.lastOffset - if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) { + if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 80763a8d79720..d910a29100c44 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -45,8 +45,10 @@ import scala.math._ * @param log The file records containing log entries * @param offsetIndex The offset index * @param timeIndex The timestamp index + * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index + * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */ @nonthreadsafe @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val maxSegmentMs: Long, - val maxSegmentBytes: Int, val time: Time) extends Logging { - def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = { - val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs - size > maxSegmentBytes - messagesSize || + def shouldRoll(rollParams: RollParams): Boolean = { + val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs + size > rollParams.maxSegmentBytes - rollParams.messagesSize || (size > 0 && reachedRollMs) || - offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages) + offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) } def resizeIndexes(size: Int): Unit = { @@ -637,8 +637,6 @@ object LogSegment { baseOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, - maxSegmentMs = config.segmentMs, - maxSegmentBytes = config.segmentSize, time) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 40b687443a056..353e5537588e3 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -38,9 +38,8 @@ class LogSegmentTest { /* create a segment with the given base offset */ def createSegment(offset: Long, indexIntervalBytes: Int = 10, - maxSegmentMs: Int = Int.MaxValue, time: Time = Time.SYSTEM): LogSegment = { - val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time) + val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time) segments += seg seg } @@ -163,10 +162,10 @@ class LogSegmentTest { val maxSegmentMs = 300000 val time = new MockTime - val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + val seg = createSegment(0, time = time) seg.close() - val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time) + val reopened = createSegment(0, time = time) assertEquals(0, seg.timeIndex.sizeInBytes) assertEquals(0, seg.offsetIndex.sizeInBytes) @@ -176,24 +175,21 @@ class LogSegmentTest { assertFalse(reopened.timeIndex.isFull) assertFalse(reopened.offsetIndex.isFull) - assertFalse(reopened.shouldRoll(messagesSize = 1024, - maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, - maxOffsetInMessages = 100L, - now = time.milliseconds())) + var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds()) + assertFalse(reopened.shouldRoll(rollParams)) // The segment should not be rolled even if maxSegmentMs has been exceeded time.sleep(maxSegmentMs + 1) assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) - assertFalse(reopened.shouldRoll(messagesSize = 1024, - maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, - maxOffsetInMessages = 100L, - now = time.milliseconds())) + rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds()) + assertFalse(reopened.shouldRoll(rollParams)) // But we should still roll the segment if we cannot fit the next offset - assertTrue(reopened.shouldRoll(messagesSize = 1024, - maxTimestampInMessages = RecordBatch.NO_TIMESTAMP, - maxOffsetInMessages = Int.MaxValue.toLong + 200, - now = time.milliseconds())) + rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP, + maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds()) + assertTrue(reopened.shouldRoll(rollParams)) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 151c4ed0ad808..77289989546db 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -277,7 +277,7 @@ class LogTest { override def addSegment(segment: LogSegment): LogSegment = { val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset, - segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) { + segment.indexIntervalBytes, segment.rollJitterMs, mockTime) { override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long, minOneMessage: Boolean): FetchDataInfo = { diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala index eb218952d0a49..8652aa509d0cf 100644 --- a/core/src/test/scala/unit/kafka/log/LogUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala @@ -29,13 +29,12 @@ object LogUtils { def createSegment(offset: Long, logDir: File, indexIntervalBytes: Int = 10, - maxSegmentMs: Int = Int.MaxValue, time: Time = Time.SYSTEM): LogSegment = { val ms = FileRecords.open(Log.logFile(logDir, offset)) val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000) val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500) val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset)) - new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time) + new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time) } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 510c4a3e273d8..cabe0a984e23c 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } } + @Test + def testDynamicTopicConfigChange() { + val tp = new TopicPartition("test", 0) + val oldSegmentSize = 1000 + val logProps = new Properties() + logProps.put(SegmentBytesProp, oldSegmentSize.toString) + createTopic(tp.topic, 1, 1, logProps) + TestUtils.retry(10000) { + val logOpt = this.servers.head.logManager.getLog(tp) + assertTrue(logOpt.isDefined) + assertEquals(oldSegmentSize, logOpt.get.config.segmentSize) + } + + val log = servers.head.logManager.getLog(tp).get + + val newSegmentSize = 2000 + logProps.put(SegmentBytesProp, newSegmentSize.toString) + adminZkClient.changeTopicConfig(tp.topic, logProps) + TestUtils.retry(10000) { + assertEquals(newSegmentSize, log.config.segmentSize) + } + + (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString)) + // Verify that the new config is used for all segments + assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000)) + } + private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) { assertTrue("Should contain a ConfigHandler for " + rootEntityType , this.servers.head.dynamicConfigHandlers.contains(rootEntityType))