Skip to content

Commit

Permalink
[LI-HOTFIX] Reduce lock retention and improve broker shutdown time:
Browse files Browse the repository at this point in the history
TICKET = [KAFKA-8667, KAFKA-8668]
LI_DESCRIPTION =
- Avoid acquiring partitionMap lock in shutdownIdleFetcherThread
- Avoid appending to the time index during shutdown if the time index has not yet be initialized

RB=1431408
BUG=LIKAFKA-19361
G=Kafka-Code-Reviews
R=jkoshy,jonlee
A=jkoshy,jonlee

EXIT_CRITERIA = TICKET [KAFKA-8667, KAFKA-8668]
  • Loading branch information
hzxa21 authored and gitlw committed Jun 13, 2020
1 parent 153d37f commit 155b4f8
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 2 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ class LogSegment private[log] (val log: FileRecords,
* Close this log segment
*/
def close(): Unit = {
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this)
if (_maxTimestampSoFar.nonEmpty || _offsetOfMaxTimestampSoFar.nonEmpty) {
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this)
}
CoreUtils.swallow(offsetIndex.close(), this)
CoreUtils.swallow(timeIndex.close(), this)
CoreUtils.swallow(log.close(), this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
lock synchronized {
val keysToBeRemoved = new mutable.HashSet[BrokerIdAndFetcherId]
for ((key, fetcher) <- fetcherThreadMap) {
if (fetcher.partitionCount <= 0) {
if (fetcher.idle) {
fetcher.shutdown()
keysToBeRemoved += key
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ abstract class AbstractFetcherThread(name: String,
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)

@volatile var idle = false

/* callbacks to be defined in subclass */

// process fetched data
Expand Down Expand Up @@ -651,6 +653,7 @@ abstract class AbstractFetcherThread(name: String,
partitionStates.remove(topicPartition)
fetcherLagStats.unregister(topicPartition)
}
idle = partitionStates.size() <= 0
} finally partitionMapLock.unlock()
}

Expand Down

0 comments on commit 155b4f8

Please sign in to comment.