Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately #5728

Merged
merged 6 commits into from
Oct 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
}
}

/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
case class RollParams(maxSegmentMs: Long,
maxSegmentBytes: Int,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)

object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.segmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
}
}

/**
* An append-only log for storing messages.
*
Expand Down Expand Up @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset

if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) {
if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " +
s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
* @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
*/
@nonthreadsafe
Expand All @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val maxSegmentMs: Long,
val maxSegmentBytes: Int,
val time: Time) extends Logging {

def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = {
val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs
size > maxSegmentBytes - messagesSize ||
def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages)
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}

def resizeIndexes(size: Int): Unit = {
Expand Down Expand Up @@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
maxSegmentMs = config.segmentMs,
maxSegmentBytes = config.segmentSize,
time)
}

Expand Down
28 changes: 12 additions & 16 deletions core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
segments += seg
seg
}
Expand Down Expand Up @@ -163,10 +162,10 @@ class LogSegmentTest {

val maxSegmentMs = 300000
val time = new MockTime
val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val seg = createSegment(0, time = time)
seg.close()

val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
val reopened = createSegment(0, time = time)
assertEquals(0, seg.timeIndex.sizeInBytes)
assertEquals(0, seg.offsetIndex.sizeInBytes)

Expand All @@ -176,24 +175,21 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)

assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))

// The segment should not be rolled even if maxSegmentMs has been exceeded
time.sleep(maxSegmentMs + 1)
assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
assertFalse(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
assertFalse(reopened.shouldRoll(rollParams))

// But we should still roll the segment if we cannot fit the next offset
assertTrue(reopened.shouldRoll(messagesSize = 1024,
maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200,
now = time.milliseconds()))
rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue, RecordBatch.NO_TIMESTAMP,
maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024, time.milliseconds())
assertTrue(reopened.shouldRoll(rollParams))
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class LogTest {

override def addSegment(segment: LogSegment): LogSegment = {
val wrapper = new LogSegment(segment.log, segment.offsetIndex, segment.timeIndex, segment.txnIndex, segment.baseOffset,
segment.indexIntervalBytes, segment.rollJitterMs, segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {

override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
minOneMessage: Boolean): FetchDataInfo = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/log/LogUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ object LogUtils {
def createSegment(offset: Long,
logDir: File,
indexIntervalBytes: Int = 10,
maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))

new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}

@Test
def testDynamicTopicConfigChange() {
val tp = new TopicPartition("test", 0)
val oldSegmentSize = 1000
val logProps = new Properties()
logProps.put(SegmentBytesProp, oldSegmentSize.toString)
createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
}

val log = servers.head.logManager.getLog(tp).get

val newSegmentSize = 2000
logProps.put(SegmentBytesProp, newSegmentSize.toString)
adminZkClient.changeTopicConfig(tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newSegmentSize, log.config.segmentSize)
}

(1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic, i.toString))
// Verify that the new config is used for all segments
assertTrue("Log segment size change not applied", log.logSegments.forall(_.size > 1000))
}

private def testQuotaConfigChange(user: String, clientId: String, rootEntityType: String, configEntityName: String) {
assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
Expand Down