Skip to content

Commit

Permalink
KAFKA-7366: Make topic configs segment.bytes and segment.ms to take e…
Browse files Browse the repository at this point in the history
…ffect immediately (apache#5728)

Reviewers: Ismael Juma <[email protected]> and Jun Rao <[email protected]>
  • Loading branch information
omkreddy authored and pengxiaolong committed Jun 14, 2019
1 parent a207457 commit ef34dd5
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 28 deletions.
22 changes: 21 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
}
}

/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
case class RollParams(maxSegmentMs: Long,
maxSegmentBytes: Int,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)

object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.segmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
}
}

/**
* An append-only log for storing messages.
*
Expand Down Expand Up @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset

if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
* @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
*/
@nonthreadsafe
Expand All @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val maxSegmentMs: Long,
val maxSegmentBytes: Int,
val time: Time) extends Logging {

def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}

def resizeIndexes(size: Int): Unit = {
Expand Down Expand Up @@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
maxSegmentMs = config.segmentMs,
maxSegmentBytes = config.segmentSize,
time)
}

Expand Down
28 changes: 12 additions & 16 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
segments += seg
seg
}
Expand Down Expand Up @@ -163,10 +162,10 @@ class LogSegmentTest {

val maxSegmentMs = 300000
val time = new MockTime
val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val seg = createSegment(0, time = time)
seg.close()

val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val reopened = createSegment(0, time = time)
assertEquals(0, seg.timeIndex.sizeInBytes)
assertEquals(0, seg.offsetIndex.sizeInBytes)

Expand All @@ -176,24 +175,21 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)

assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))

// The segment should not be rolled even if maxSegmentMs has been exceeded
time.sleep(maxSegmentMs + 1)
assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))

// But we should still roll the segment if we cannot fit the next offset
assertTrue(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds())
assertTrue(reopened.shouldRoll(rollParams))
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class LogTest {

override def addSegment(segment: LogSegment): LogSegment = {
val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {

override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
minOneMessage: Boolean): FetchDataInfo = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/log/LogUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ object LogUtils {
def createSegment(offset: Long,
logDir: File,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))

new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}

@Test
def testDynamicTopicConfigChange() {
val tp = new TopicPartition("test", 0)
val oldSegmentSize = 1000
val logProps = new Properties()
logProps.put(SegmentBytesProp, oldSegmentSize.toString)
createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
}

val log = servers.head.logManager.getLog(tp).get

val newSegmentSize = 2000
logProps.put(SegmentBytesProp, newSegmentSize.toString)
adminZkClient.changeTopicConfig(tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newSegmentSize, log.config.segmentSize)
}

(1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
// Verify that the new config is used for all segments
assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000))
}

private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) {
assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
Expand Down

0 comments on commit ef34dd5

Please sign in to comment.