Skip to content

Commit

Permalink
KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) (apach…
Browse files Browse the repository at this point in the history
…e#14699)

The PR provide implementation for client metrics manager along with other classes. Manager is responsible to support 3 operations:

UpdateSubscription - From kafka-configs.sh and reload from metadata cache.
Process Get Telemetry Request - From KafkaApis.scala
Process Push Telemetry Request - From KafkaApis.scala
Manager maintains an in-memory cache to keep track of client instances against their instance id.

Reviewers: Andrew Schofield <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored and clolov committed Apr 5, 2024
1 parent 24d0db9 commit 7d9bb1a
Show file tree
Hide file tree
Showing 25 changed files with 2,179 additions and 83 deletions.
11 changes: 11 additions & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.cache" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.network" />

<!-- protocol, records and request/response utilities -->
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />

<!-- utilities and reusable classes from server-common -->
Expand All @@ -58,4 +61,12 @@
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />

<!-- server-metrics specific classes -->
<allow pkg="org.apache.kafka.server.metrics" />

<subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow pkg="org.apache.kafka.server.telemetry" />
</subpackage>

</import-control>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* This exception indicates that the size of the telemetry metrics data is too large.
*/
public class TelemetryTooLargeException extends ApiException {

public TelemetryTooLargeException(String message) {
super(message);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* This exception indicates that the client sent an invalid or outdated SubscriptionId
*/
public class UnknownSubscriptionIdException extends ApiException {

public UnknownSubscriptionIdException(String message) {
super(message);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
import org.apache.kafka.common.errors.ControllerMovedException;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownControllerIdException;
import org.apache.kafka.common.errors.UnknownLeaderEpochException;
Expand Down Expand Up @@ -386,7 +388,9 @@ public enum Errors {
STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new),
MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new),
UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new),
UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new);
UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new),
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;

import java.nio.ByteBuffer;

public class PushTelemetryRequest extends AbstractRequest {

private static final String OTLP_CONTENT_TYPE = "OTLP";

public static class Builder extends AbstractRequest.Builder<PushTelemetryRequest> {

private final PushTelemetryRequestData data;
Expand Down Expand Up @@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) {

@Override
public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) {
PushTelemetryResponseData responseData = new PushTelemetryResponseData()
.setErrorCode(Errors.forException(e).code())
.setThrottleTimeMs(throttleTimeMs);
return new PushTelemetryResponse(responseData);
return errorResponse(throttleTimeMs, Errors.forException(e));
}

@Override
public PushTelemetryRequestData data() {
return data;
}

public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) {
PushTelemetryResponseData responseData = new PushTelemetryResponseData();
responseData.setErrorCode(errors.code());
responseData.setThrottleTimeMs(throttleTimeMs);
return new PushTelemetryResponse(responseData);
}

public String metricsContentType() {
// Future versions of PushTelemetryRequest and GetTelemetrySubscriptionsRequest may include a content-type
// field to allow for updated OTLP format versions (or additional formats), but this field is currently not
// included since only one format is specified in the current proposal of the kip-714
return OTLP_CONTENT_TYPE;
}

public ByteBuffer metricsData() {
CompressionType cType = CompressionType.forId(this.data.compressionType());
return (cType == CompressionType.NONE) ?
ByteBuffer.wrap(this.data.metrics()) : decompressMetricsData(cType, this.data.metrics());
}

private static ByteBuffer decompressMetricsData(CompressionType compressionType, byte[] metrics) {
// TODO: Add support for decompression of metrics data
return ByteBuffer.wrap(metrics);
}

public static PushTelemetryRequest parse(ByteBuffer buffer, short version) {
return new PushTelemetryRequest(new PushTelemetryRequestData(
new ByteBufferAccessor(buffer), version), version);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeTo
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
Expand Down Expand Up @@ -177,7 +177,8 @@ class BrokerServer(

info("Starting broker")

config.dynamicConfig.initialize(zkClientOpt = None)
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin))

/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
Expand Down Expand Up @@ -346,7 +347,7 @@ class BrokerServer(
config, Some(clientToControllerChannelManager), None, None,
groupCoordinator, transactionCoordinator)

clientMetricsManager = ClientMetricsManager.instance()
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time)

dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None),
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.{ClientMetricsManager, NodeToControllerChannelManager}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
Expand Down Expand Up @@ -125,7 +125,6 @@ class ControllerServer(
@volatile var incarnationId: Uuid = _
@volatile var registrationManager: ControllerRegistrationManager = _
@volatile var registrationChannelManager: NodeToControllerChannelManager = _
var clientMetricsManager: ClientMetricsManager = _

private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
Expand All @@ -147,7 +146,7 @@ class ControllerServer(
try {
this.logIdent = logContext.logPrefix()
info("Starting controller")
config.dynamicConfig.initialize(zkClientOpt = None)
config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)

maybeChangeStatus(STARTING, STARTED)

Expand Down Expand Up @@ -334,8 +333,6 @@ class ControllerServer(
DataPlaneAcceptor.ThreadPrefix,
"controller")

clientMetricsManager = ClientMetricsManager.instance()

// Set up the metadata cache publisher.
metadataPublishers.add(metadataCachePublisher)

Expand Down Expand Up @@ -366,8 +363,7 @@ class ControllerServer(
sharedServer.metadataPublishingFaultHandler,
immutable.Map[String, ConfigHandler](
// controllers don't host topics, so no need to do anything with dynamic topic config changes here
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers),
ConfigType.ClientMetrics -> new ClientMetricsConfigHandler(clientMetricsManager)
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)
),
"controller"))

Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}

import scala.annotation.nowarn
Expand Down Expand Up @@ -216,15 +218,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.noop())
}

private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt

zkClientOpt.foreach { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
Expand Down Expand Up @@ -327,6 +331,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
dynamicDefaultConfigs.clone()
}

private[server] def clientMetricsReceiverPlugin: Option[ClientMetricsReceiverPlugin] = CoreUtils.inReadLock(lock) {
metricsReceiverPluginOpt
}

private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
Expand Down Expand Up @@ -913,6 +921,19 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
reporters.forEach { reporter =>
metrics.addReporter(reporter)
currentReporters += reporter.getClass.getName -> reporter
val clientTelemetryReceiver = reporter match {
case telemetry: ClientTelemetry => telemetry.clientReceiver()
case _ => null
}

if (clientTelemetryReceiver != null) {
dynamicConfig.clientMetricsReceiverPlugin match {
case Some(receiverPlugin) =>
receiverPlugin.add(clientTelemetryReceiver)
case None =>
// Do nothing
}
}
}
KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
}
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ object Defaults {
val KafkaMetricReporterClasses = ""
val KafkaMetricsPollingIntervalSeconds = 10

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
val ClientTelemetryMaxBytes = 1024 * 1024

/** ********* SSL configuration ***********/
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
Expand Down Expand Up @@ -589,6 +592,9 @@ object KafkaConfig {
val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
val KafkaMetricsPollingIntervalSecondsProp = "kafka.metrics.polling.interval.secs"

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
val ClientTelemetryMaxBytesProp = "telemetry.max.bytes"

/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
Expand Down Expand Up @@ -1092,6 +1098,10 @@ object KafkaConfig {
val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval (in seconds) which can be used" +
s" in $KafkaMetricsReporterClassesProp implementations."

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
val ClientTelemetryMaxBytesDoc = "The maximum size (after compression if compression is used) of" +
" telemetry metrics pushed from a client to the broker. The default value is 1048576 (1 MB)."

/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
Expand Down Expand Up @@ -1419,6 +1429,9 @@ object KafkaConfig {
.define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
.define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc)

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
.define(ClientTelemetryMaxBytesProp, INT, Defaults.ClientTelemetryMaxBytes, atLeast(1), LOW, ClientTelemetryMaxBytesDoc)

/** ********* Quota configuration ***********/
.define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
.define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc)
Expand Down Expand Up @@ -1586,6 +1599,7 @@ object KafkaConfig {
case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala)
case ConfigResource.Type.BROKER_LOGGER => false
case ConfigResource.Type.CLIENT_METRICS => false
case _ => true
}
if (maybeSensitive) Password.HIDDEN else value
Expand Down Expand Up @@ -2033,6 +2047,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
val clientTelemetryMaxBytes: Int = getInt(KafkaConfig.ClientTelemetryMaxBytesProp)

/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
// Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class KafkaServer(

// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after ZkConfigManager starts.
config.dynamicConfig.initialize(Some(zkClient))
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)

/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None)
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)

if (sharedServerConfig.processRoles.contains(BrokerRole)) {
brokerMetrics = BrokerServerMetrics(metrics)
Expand Down
Loading

0 comments on commit 7d9bb1a

Please sign in to comment.