Skip to content

Commit

Permalink
KAFKA-16446: Improve controller event duration logging (#15622)
Browse files Browse the repository at this point in the history
There are times when the controller has a high event processing time, such as during startup, or when creating a topic with many partitions. We can see these processing times in the p99 metric (kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs), however it's difficult to see exactly which event is causing high processing time.

With DEBUG logs, we see every event along with its processing time. Even with this, it's a bit tedious to find the event with a high processing time.

This PR logs all events which take longer than 2 seconds at ERROR level. This will help identify events that are taking far too long, and which could be disruptive to the operation of the controller. The slow event logging looks like this:

```
[2024-12-20 15:03:39,754] ERROR [QuorumController id=1] Exceptionally slow controller event createTopics took 5240 ms.  (org.apache.kafka.controller.EventPerformanceMonitor)
```

Also, every 60 seconds, it logs some event time statistics, including average time, maximum time, and the name of the event which took the longest. This periodic message looks like this:

```
[2024-12-20 15:35:04,798] INFO [QuorumController id=1] In the last 60000 ms period, 333 events were completed, which took an average of 12.34 ms each. The slowest event was handleCommit[baseOffset=0], which took 41.90 ms. (org.apache.kafka.controller.EventPerformanceMonitor)
```

An operator can disable these logs by adding the following to their log4j config:

```
org.apache.kafka.controller.EventPerformanceMonitor=OFF
```

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah authored Jan 6, 2025
1 parent a52aedd commit c4840f5
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 9 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value())
setInterBrokerListenerName(config.interBrokerListenerName.value()).
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs)
}
controller = controllerBuilder.build()

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val initialRegistrationTimeoutMs: Int = getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
val brokerHeartbeatIntervalMs: Int = getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;

import java.text.DecimalFormat;
import java.util.AbstractMap;
import java.util.Map;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Track the performance of controller events. Periodically log the slowest events.
* Log any event slower than a certain threshold.
*/
class EventPerformanceMonitor {
/**
* The format to use when displaying milliseconds.
*/
private static final DecimalFormat MILLISECOND_DECIMAL_FORMAT = new DecimalFormat("#0.00");

static class Builder {
LogContext logContext = null;
long periodNs = SECONDS.toNanos(60);
long alwaysLogThresholdNs = SECONDS.toNanos(2);

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}

Builder setPeriodNs(long periodNs) {
this.periodNs = periodNs;
return this;
}

Builder setAlwaysLogThresholdNs(long alwaysLogThresholdNs) {
this.alwaysLogThresholdNs = alwaysLogThresholdNs;
return this;
}

EventPerformanceMonitor build() {
if (logContext == null) logContext = new LogContext();
return new EventPerformanceMonitor(logContext,
periodNs,
alwaysLogThresholdNs);
}
}

/**
* The log4j object to use.
*/
private final Logger log;

/**
* The period in nanoseconds.
*/
private long periodNs;

/**
* The always-log threshold in nanoseconds.
*/
private long alwaysLogThresholdNs;

/**
* The name of the slowest event we've seen so far, or null if none has been seen.
*/
private String slowestEventName;

/**
* The duration of the slowest event we've seen so far, or 0 if none has been seen.
*/
private long slowestEventDurationNs;

/**
* The total duration of all the events we've seen.
*/
private long totalEventDurationNs;

/**
* The number of events we've seen.
*/
private int numEvents;

private EventPerformanceMonitor(
LogContext logContext,
long periodNs,
long alwaysLogThresholdNs
) {
this.log = logContext.logger(EventPerformanceMonitor.class);
this.periodNs = periodNs;
this.alwaysLogThresholdNs = alwaysLogThresholdNs;
reset();
}

long periodNs() {
return periodNs;
}

Map.Entry<String, Long> slowestEvent() {
return new AbstractMap.SimpleImmutableEntry<>(slowestEventName, slowestEventDurationNs);
}

/**
* Reset all internal state.
*/
void reset() {
this.slowestEventName = null;
this.slowestEventDurationNs = 0;
this.totalEventDurationNs = 0;
this.numEvents = 0;
}

/**
* Handle a controller event being finished.
*
* @param name The name of the controller event.
* @param durationNs The duration of the controller event in nanoseconds.
*/
void observeEvent(String name, long durationNs) {
String message = doObserveEvent(name, durationNs);
if (message != null) {
log.error("{}", message);
}
}

/**
* Handle a controller event being finished.
*
* @param name The name of the controller event.
* @param durationNs The duration of the controller event in nanoseconds.
*
* @return The message to log, or null otherwise.
*/
String doObserveEvent(String name, long durationNs) {
if (slowestEventName == null || slowestEventDurationNs < durationNs) {
slowestEventName = name;
slowestEventDurationNs = durationNs;
}
totalEventDurationNs += durationNs;
numEvents++;
if (durationNs < alwaysLogThresholdNs) {
return null;
}
return "Exceptionally slow controller event " + name + " took " +
NANOSECONDS.toMillis(durationNs) + " ms.";
}

/**
* Generate a log message summarizing the events of the last period,
* and then reset our internal state.
*/
void generatePeriodicPerformanceMessage() {
String message = periodicPerformanceMessage();
log.info("{}", message);
reset();
}

/**
* Generate a log message summarizing the events of the last period.
*
* @return The summary string.
*/
String periodicPerformanceMessage() {
StringBuilder bld = new StringBuilder();
bld.append("In the last ");
bld.append(NANOSECONDS.toMillis(periodNs));
bld.append(" ms period, ");
if (numEvents == 0) {
bld.append("there were no controller events completed.");
} else {
bld.append(numEvents).append(" controller events were completed, which took an average of ");
bld.append(formatNsAsDecimalMs(totalEventDurationNs / numEvents));
bld.append(" ms each. The slowest event was ").append(slowestEventName);
bld.append(", which took ");
bld.append(formatNsAsDecimalMs(slowestEventDurationNs));
bld.append(" ms.");
}
return bld.toString();
}

/**
* Translate a duration in nanoseconds to a decimal duration in milliseconds.
*
* @param durationNs The duration in nanoseconds.
* @return The decimal duration in milliseconds.
*/
static String formatNsAsDecimalMs(long durationNs) {
double number = NANOSECONDS.toMicros(durationNs);
number /= 1000;
return MILLISECOND_DECIMAL_FORMAT.format(number);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,21 @@
*/
public final class QuorumController implements Controller {
/**
* The maximum records that the controller will write in a single batch.
* The default maximum records that the controller will write in a single batch.
*/
private static final int MAX_RECORDS_PER_BATCH = 10000;
private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;

/**
* The default minimum event time that can be logged as a slow event.
*/
private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;

/**
* The maximum records any user-initiated operation is allowed to generate.
*
* For now, this is set to the maximum records in a single batch.
*/
static final int MAX_RECORDS_PER_USER_OP = MAX_RECORDS_PER_BATCH;
static final int MAX_RECORDS_PER_USER_OP = DEFAULT_MAX_RECORDS_PER_BATCH;

/**
* A builder class which creates the QuorumController.
Expand Down Expand Up @@ -213,7 +218,9 @@ public static class Builder {
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private int maxRecordsPerBatch = DEFAULT_MAX_RECORDS_PER_BATCH;
private long controllerPerformanceSamplePeriodMs = 60000L;
private long controllerPerformanceAlwaysLogThresholdMs = 2000L;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
Expand Down Expand Up @@ -321,6 +328,16 @@ public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) {
return this;
}

public Builder setControllerPerformanceSamplePeriodMs(long controllerPerformanceSamplePeriodMs) {
this.controllerPerformanceSamplePeriodMs = controllerPerformanceSamplePeriodMs;
return this;
}

public Builder setControllerPerformanceAlwaysLogThresholdMs(long controllerPerformanceAlwaysLogThresholdMs) {
this.controllerPerformanceAlwaysLogThresholdMs = controllerPerformanceAlwaysLogThresholdMs;
return this;
}

public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
this.createTopicPolicy = createTopicPolicy;
return this;
Expand Down Expand Up @@ -433,7 +450,9 @@ public QuorumController build() throws Exception {
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
uncleanLeaderElectionCheckIntervalMs,
interBrokerListenerName
interBrokerListenerName,
controllerPerformanceSamplePeriodMs,
controllerPerformanceAlwaysLogThresholdMs
);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
Expand Down Expand Up @@ -524,6 +543,7 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
long deltaNs = endProcessingTime - startProcessingTimeNs;
log.debug("Processed {} in {} us", name,
MICROSECONDS.convert(deltaNs, NANOSECONDS));
performanceMonitor.observeEvent(name, deltaNs);
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
}

Expand All @@ -536,6 +556,8 @@ private Throwable handleEventException(
if (startProcessingTimeNs.isPresent()) {
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
performanceMonitor.observeEvent(name, deltaNs);
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS));
} else {
deltaUs = OptionalLong.empty();
Expand Down Expand Up @@ -1446,6 +1468,11 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
*/
private final RecordRedactor recordRedactor;

/**
* Monitors the performance of controller events and generates logs about it.
*/
private final EventPerformanceMonitor performanceMonitor;

private QuorumController(
FaultHandler nonFatalFaultHandler,
FaultHandler fatalFaultHandler,
Expand Down Expand Up @@ -1477,7 +1504,9 @@ private QuorumController(
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName
String interBrokerListenerName,
long controllerPerformanceSamplePeriodMs,
long controllerPerformanceAlwaysLogThresholdMs
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
Expand Down Expand Up @@ -1574,6 +1603,11 @@ private QuorumController(
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.performanceMonitor = new EventPerformanceMonitor.Builder().
setLogContext(logContext).
setPeriodNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceSamplePeriodMs)).
setAlwaysLogThresholdNs(TimeUnit.MILLISECONDS.toNanos(controllerPerformanceAlwaysLogThresholdMs)).
build();
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
Expand All @@ -1587,7 +1621,7 @@ private QuorumController(
}
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));

registerGeneratePeriodicPerformanceMessage();
// OffsetControlManager must be initialized last, because its constructor will take the
// initial in-memory snapshot of all extant timeline data structures.
this.offsetControl = new OffsetControlManager.Builder().
Expand All @@ -1597,7 +1631,6 @@ private QuorumController(
setTime(time).
build();
log.info("Creating new QuorumController with clusterId {}", clusterId);

this.raftClient.register(metaLogListener);
}

Expand Down Expand Up @@ -1681,6 +1714,21 @@ private void registerElectUnclean(long checkIntervalNs) {
EnumSet.of(PeriodicTaskFlag.VERBOSE)));
}

/**
* Register the generatePeriodicPerformanceMessage task.
*
* This task periodically logs some statistics about controller performance.
*/
private void registerGeneratePeriodicPerformanceMessage() {
periodicControl.registerTask(new PeriodicTask("generatePeriodicPerformanceMessage",
() -> {
performanceMonitor.generatePeriodicPerformanceMessage();
return ControllerResult.of(Collections.emptyList(), false);
},
performanceMonitor.periodNs(),
EnumSet.noneOf(PeriodicTaskFlag.class)));
}

/**
* Register the delegation token expiration task.
*
Expand Down
Loading

0 comments on commit c4840f5

Please sign in to comment.