From 7d9bb1a8c8bbeada6a536ea44404d3d6bc15a467 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 29 Nov 2023 22:50:07 +0530 Subject: [PATCH] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) (#14699) The PR provide implementation for client metrics manager along with other classes. Manager is responsible to support 3 operations: UpdateSubscription - From kafka-configs.sh and reload from metadata cache. Process Get Telemetry Request - From KafkaApis.scala Process Push Telemetry Request - From KafkaApis.scala Manager maintains an in-memory cache to keep track of client instances against their instance id. Reviewers: Andrew Schofield , Jun Rao --- checkstyle/import-control-server.xml | 11 + .../errors/TelemetryTooLargeException.java | 28 + .../UnknownSubscriptionIdException.java | 28 + .../apache/kafka/common/protocol/Errors.java | 6 +- .../common/requests/PushTelemetryRequest.java | 34 +- .../scala/kafka/server/BrokerServer.scala | 7 +- .../scala/kafka/server/ControllerServer.scala | 10 +- .../kafka/server/DynamicBrokerConfig.scala | 23 +- .../main/scala/kafka/server/KafkaConfig.scala | 17 + .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../scala/kafka/server/SharedServer.scala | 2 +- .../metadata/DynamicConfigPublisher.scala | 13 +- .../server/DynamicBrokerConfigTest.scala | 40 +- .../kafka/server/ReplicaManagerTest.scala | 4 +- .../kafka/server/ClientMetricsManager.java | 396 +++++++- .../server/metrics/ClientMetricsConfigs.java | 74 +- .../server/metrics/ClientMetricsInstance.java | 123 +++ .../ClientMetricsInstanceMetadata.java | 72 ++ .../metrics/ClientMetricsReceiverPlugin.java | 56 ++ .../DefaultClientTelemetryPayload.java | 61 ++ .../server/ClientMetricsManagerTest.java | 922 ++++++++++++++++++ .../ClientMetricsInstanceMetadataTest.java | 134 +++ .../metrics/ClientMetricsInstanceTest.java | 89 ++ .../ClientMetricsReceiverPluginTest.java | 61 ++ .../metrics/ClientMetricsTestUtils.java | 49 + 25 files changed, 2179 insertions(+), 83 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/TelemetryTooLargeException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/UnknownSubscriptionIdException.java create mode 100644 server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java create mode 100644 server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java create mode 100644 server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java create mode 100644 server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java create mode 100644 server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java create mode 100644 server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java create mode 100644 server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java create mode 100644 server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index 499f8327ab607..765f6faf4c15d 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -38,17 +38,20 @@ + + + @@ -58,4 +61,12 @@ + + + + + + + + diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TelemetryTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/TelemetryTooLargeException.java new file mode 100644 index 0000000000000..4be6ef50a4b83 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TelemetryTooLargeException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that the size of the telemetry metrics data is too large. + */ +public class TelemetryTooLargeException extends ApiException { + + public TelemetryTooLargeException(String message) { + super(message); + } +} + diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownSubscriptionIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownSubscriptionIdException.java new file mode 100644 index 0000000000000..e2041db0ed9d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownSubscriptionIdException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that the client sent an invalid or outdated SubscriptionId + */ +public class UnknownSubscriptionIdException extends ApiException { + + public UnknownSubscriptionIdException(String message) { + super(message); + } +} + diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index e2d57278ef881..8b4c78f890cff 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; import org.apache.kafka.common.errors.ControllerMovedException; @@ -118,6 +119,7 @@ import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; import org.apache.kafka.common.errors.UnacceptableCredentialException; import org.apache.kafka.common.errors.UnknownControllerIdException; import org.apache.kafka.common.errors.UnknownLeaderEpochException; @@ -386,7 +388,9 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), - UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); + UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), + UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new), + TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java index 5df03ed3461b0..07827ebff7b49 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.common.requests; import org.apache.kafka.common.message.PushTelemetryRequestData; @@ -22,11 +21,14 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; import java.nio.ByteBuffer; public class PushTelemetryRequest extends AbstractRequest { + private static final String OTLP_CONTENT_TYPE = "OTLP"; + public static class Builder extends AbstractRequest.Builder { private final PushTelemetryRequestData data; @@ -60,10 +62,7 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { - PushTelemetryResponseData responseData = new PushTelemetryResponseData() - .setErrorCode(Errors.forException(e).code()) - .setThrottleTimeMs(throttleTimeMs); - return new PushTelemetryResponse(responseData); + return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override @@ -71,6 +70,31 @@ public PushTelemetryRequestData data() { return data; } + public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { + PushTelemetryResponseData responseData = new PushTelemetryResponseData(); + responseData.setErrorCode(errors.code()); + responseData.setThrottleTimeMs(throttleTimeMs); + return new PushTelemetryResponse(responseData); + } + + public String metricsContentType() { + // Future versions of PushTelemetryRequest and GetTelemetrySubscriptionsRequest may include a content-type + // field to allow for updated OTLP format versions (or additional formats), but this field is currently not + // included since only one format is specified in the current proposal of the kip-714 + return OTLP_CONTENT_TYPE; + } + + public ByteBuffer metricsData() { + CompressionType cType = CompressionType.forId(this.data.compressionType()); + return (cType == CompressionType.NONE) ? + ByteBuffer.wrap(this.data.metrics()) : decompressMetricsData(cType, this.data.metrics()); + } + + private static ByteBuffer decompressMetricsData(CompressionType compressionType, byte[] metrics) { + // TODO: Add support for decompression of metrics data + return ByteBuffer.wrap(metrics); + } + public static PushTelemetryRequest parse(ByteBuffer buffer, short version) { return new PushTelemetryRequest(new PushTelemetryRequestData( new ByteBufferAccessor(buffer), version), version); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 18d94be7aa8c9..4d5731a3fdc41 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -46,7 +46,7 @@ import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeTo import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig -import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.util.timer.SystemTimer import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler} @@ -177,7 +177,8 @@ class BrokerServer( info("Starting broker") - config.dynamicConfig.initialize(zkClientOpt = None) + val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin() + config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin)) /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) @@ -346,7 +347,7 @@ class BrokerServer( config, Some(clientToControllerChannelManager), None, None, groupCoordinator, transactionCoordinator) - clientMetricsManager = ClientMetricsManager.instance() + clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time) dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None), diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 1f08cdffd0570..a7d91ca9df931 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -44,7 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator} import org.apache.kafka.metadata.publisher.FeaturesPublisher import org.apache.kafka.raft.RaftConfig -import org.apache.kafka.server.{ClientMetricsManager, NodeToControllerChannelManager} +import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} @@ -125,7 +125,6 @@ class ControllerServer( @volatile var incarnationId: Uuid = _ @volatile var registrationManager: ControllerRegistrationManager = _ @volatile var registrationChannelManager: NodeToControllerChannelManager = _ - var clientMetricsManager: ClientMetricsManager = _ private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { lock.lock() @@ -147,7 +146,7 @@ class ControllerServer( try { this.logIdent = logContext.logPrefix() info("Starting controller") - config.dynamicConfig.initialize(zkClientOpt = None) + config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) maybeChangeStatus(STARTING, STARTED) @@ -334,8 +333,6 @@ class ControllerServer( DataPlaneAcceptor.ThreadPrefix, "controller") - clientMetricsManager = ClientMetricsManager.instance() - // Set up the metadata cache publisher. metadataPublishers.add(metadataCachePublisher) @@ -366,8 +363,7 @@ class ControllerServer( sharedServer.metadataPublishingFaultHandler, immutable.Map[String, ConfigHandler]( // controllers don't host topics, so no need to do anything with dynamic topic config changes here - ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers), - ConfigType.ClientMetrics -> new ClientMetricsConfigHandler(clientMetricsManager) + ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers) ), "controller")) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index eac519b1dece8..2acbee89756c7 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -38,6 +38,8 @@ import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} import org.apache.kafka.server.config.ServerTopicConfigSynonyms import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig +import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin +import org.apache.kafka.server.telemetry.ClientTelemetry import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig} import scala.annotation.nowarn @@ -216,6 +218,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock + private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _ private var currentConfig: KafkaConfig = _ private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) { maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) @@ -223,8 +226,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging Some(PasswordEncoder.noop()) } - private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = { + private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false, None) + metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt zkClientOpt.foreach { zkClient => val adminZkClient = new AdminZkClient(zkClient) @@ -327,6 +331,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicDefaultConfigs.clone() } + private[server] def clientMetricsReceiverPlugin: Option[ClientMetricsReceiverPlugin] = CoreUtils.inReadLock(lock) { + metricsReceiverPluginOpt + } + private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) { try { val props = fromPersistentProps(persistentProps, perBrokerConfig = true) @@ -913,6 +921,19 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me reporters.forEach { reporter => metrics.addReporter(reporter) currentReporters += reporter.getClass.getName -> reporter + val clientTelemetryReceiver = reporter match { + case telemetry: ClientTelemetry => telemetry.clientReceiver() + case _ => null + } + + if (clientTelemetryReceiver != null) { + dynamicConfig.clientMetricsReceiverPlugin match { + case Some(receiverPlugin) => + receiverPlugin.add(clientTelemetryReceiver) + case None => + // Do nothing + } + } } KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 63366a35b2def..42896a5c33dbf 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -232,6 +232,9 @@ object Defaults { val KafkaMetricReporterClasses = "" val KafkaMetricsPollingIntervalSeconds = 10 + /** ********* Kafka Client Telemetry Metrics Configuration ***********/ + val ClientTelemetryMaxBytes = 1024 * 1024 + /** ********* SSL configuration ***********/ val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS @@ -589,6 +592,9 @@ object KafkaConfig { val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters" val KafkaMetricsPollingIntervalSecondsProp = "kafka.metrics.polling.interval.secs" + /** ********* Kafka Client Telemetry Metrics Configuration ***********/ + val ClientTelemetryMaxBytesProp = "telemetry.max.bytes" + /** ******** Common Security Configuration *************/ val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS @@ -1092,6 +1098,10 @@ object KafkaConfig { val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval (in seconds) which can be used" + s" in $KafkaMetricsReporterClassesProp implementations." + /** ********* Kafka Client Telemetry Metrics Configuration ***********/ + val ClientTelemetryMaxBytesDoc = "The maximum size (after compression if compression is used) of" + + " telemetry metrics pushed from a client to the broker. The default value is 1048576 (1 MB)." + /** ******** Common Security Configuration *************/ val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC @@ -1419,6 +1429,9 @@ object KafkaConfig { .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc) .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc) + /** ********* Kafka Client Telemetry Metrics Configuration ***********/ + .define(ClientTelemetryMaxBytesProp, INT, Defaults.ClientTelemetryMaxBytes, atLeast(1), LOW, ClientTelemetryMaxBytesDoc) + /** ********* Quota configuration ***********/ .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc) @@ -1586,6 +1599,7 @@ object KafkaConfig { case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name)) case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala) case ConfigResource.Type.BROKER_LOGGER => false + case ConfigResource.Type.CLIENT_METRICS => false case _ => true } if (maybeSensitive) Password.HIDDEN else value @@ -2033,6 +2047,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp) + /** ********* Kafka Client Telemetry Metrics Configuration ***********/ + val clientTelemetryMaxBytes: Int = getInt(KafkaConfig.ClientTelemetryMaxBytesProp) + /** ********* SSL/SASL Configuration **************/ // Security configs may be overridden for listeners, so it is not safe to use the base values // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 072d47e33499a..2a43f7f5e283d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -255,7 +255,7 @@ class KafkaServer( // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after ZkConfigManager starts. - config.dynamicConfig.initialize(Some(zkClient)) + config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None) /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index abf7e0fa0decb..a8733005eed10 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -244,7 +244,7 @@ class SharedServer( // This is only done in tests. metrics = new Metrics() } - sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None) + sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) if (sharedServerConfig.processRoles.contains(BrokerRole)) { brokerMetrics = BrokerServerMetrics(metrics) diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index b03648a3ef503..03122b5ef1ba2 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -103,9 +103,16 @@ class DynamicConfigPublisher( ) case CLIENT_METRICS => // Apply changes to client metrics subscription. - info(s"Updating client metrics subscription ${resource.name()} with new configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(), props) + dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler => + try { + info(s"Updating client metrics ${resource.name()} with new configuration : " + + toLoggableProps(resource, props).mkString(",")) + metricsConfigHandler.processConfigChanges(resource.name(), props) + } catch { + case t: Throwable => faultHandler.handleFault("Error updating client metrics" + + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName", t) + }) case _ => // nothing to do } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 861bbfe36117d..e6537edb02560 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -58,7 +58,7 @@ class DynamicBrokerConfigTest { props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore) val config = KafkaConfig(props) val dynamicConfig = config.dynamicConfig - dynamicConfig.initialize(None) + dynamicConfig.initialize(None, None) assertEquals(config, dynamicConfig.currentKafkaConfig) assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -116,7 +116,7 @@ class DynamicBrokerConfigTest { Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig]))) .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0))) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock)) val props = new Properties() @@ -155,7 +155,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.logManager).thenReturn(logManagerMock) Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new BrokerDynamicThreadPool(serverMock)) config.dynamicConfig.addReconfigurable(acceptorMock) @@ -204,7 +204,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12") @@ -226,7 +226,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val validProps = Map.empty[String, String] val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20") @@ -330,7 +330,7 @@ class DynamicBrokerConfigTest { val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") val config = KafkaConfig(configProps) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val props = new Properties props.put(name, value) @@ -402,7 +402,7 @@ class DynamicBrokerConfigTest { props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;") props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") val config = KafkaConfig(props) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val dynamicProps = new Properties dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;") @@ -414,7 +414,7 @@ class DynamicBrokerConfigTest { // New config with same secret should use the dynamic password config val newConfigWithSameSecret = KafkaConfig(props) - newConfigWithSameSecret.dynamicConfig.initialize(None) + newConfigWithSameSecret.dynamicConfig.initialize(None, None) newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value) @@ -478,7 +478,7 @@ class DynamicBrokerConfigTest { def testAuthorizerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None) + oldConfig.dynamicConfig.initialize(None, None) val kafkaServer: KafkaServer = mock(classOf[kafka.server.KafkaServer]) when(kafkaServer.config).thenReturn(oldConfig) @@ -525,7 +525,7 @@ class DynamicBrokerConfigTest { def testCombinedControllerAuthorizerConfig(): Unit = { val props = createCombinedControllerConfig(0, 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None) + oldConfig.dynamicConfig.initialize(None, None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -571,7 +571,7 @@ class DynamicBrokerConfigTest { def testIsolatedControllerAuthorizerConfig(): Unit = { val props = createIsolatedControllerConfig(0, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None) + oldConfig.dynamicConfig.initialize(None, None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -615,7 +615,7 @@ class DynamicBrokerConfigTest { initialProps.remove(KafkaConfig.BackgroundThreadsProp) val oldConfig = KafkaConfig.fromProps(initialProps) val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig) - dynamicBrokerConfig.initialize(Some(zkClient)) + dynamicBrokerConfig.initialize(Some(zkClient), None) dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool) val newprops = new Properties() @@ -628,7 +628,7 @@ class DynamicBrokerConfigTest { def testImproperConfigsAreRemoved(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) val config = KafkaConfig(props) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) assertEquals(Defaults.MaxConnections, config.maxConnections) assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES, config.messageMaxBytes) @@ -663,7 +663,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertEquals(1, m.currentReporters.size) @@ -689,7 +689,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertTrue(m.currentReporters.isEmpty) @@ -722,7 +722,7 @@ class DynamicBrokerConfigTest { props.put(KafkaConfig.LogRetentionTimeMillisProp, "2592000000") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -745,7 +745,7 @@ class DynamicBrokerConfigTest { props.put(KafkaConfig.LogRetentionBytesProp, "4294967296") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -768,7 +768,7 @@ class DynamicBrokerConfigTest { props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000") props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") val config = KafkaConfig(props) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) // Check for invalid localRetentionMs < -2 verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3")) @@ -800,7 +800,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) val props = new Properties() @@ -822,7 +822,7 @@ class DynamicBrokerConfigTest { props.put(KafkaConfig.LogRetentionBytesProp, retentionBytes.toString) val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaServer])) - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 7bfb2e4a8d5d2..a4ececf219752 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2434,7 +2434,7 @@ class ReplicaManagerTest { verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) // Dynamically enable verification. - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val props = new Properties() props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "true") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) @@ -2485,7 +2485,7 @@ class ReplicaManagerTest { assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // Disable verification - config.dynamicConfig.initialize(None) + config.dynamicConfig.initialize(None, None) val props = new Properties() props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "false") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index cd5a18be776a7..5023ebd81d734 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -16,12 +16,50 @@ */ package org.apache.kafka.server; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.ClientMetricsConfigs; +import org.apache.kafka.server.metrics.ClientMetricsInstance; +import org.apache.kafka.server.metrics.ClientMetricsInstanceMetadata; +import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. @@ -29,18 +67,366 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; - public static ClientMetricsManager instance() { - return INSTANCE; + private final ClientMetricsReceiverPlugin receiverPlugin; + private final Cache clientInstanceCache; + private final Map subscriptionMap; + private final int clientTelemetryMaxBytes; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; + + public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clientTelemetryMaxBytes, Time time) { + this.receiverPlugin = receiverPlugin; + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.clientTelemetryMaxBytes = clientTelemetryMaxBytes; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + /* + Increment subscription update version to indicate that there is a change in the subscription. This will + be used to determine if the next telemetry request needs to re-evaluate the subscription id + as per the changed subscriptions. + */ + subscriptionUpdateVersion.incrementAndGet(); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, RequestContext requestContext) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + receiverPlugin.exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(0, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData()); + } + + public boolean isTelemetryReceiverConfigured() { + return !receiverPlugin.isEmpty(); } @Override public void close() throws IOException { - // TODO: Implement the close logic to close the client metrics manager. + subscriptionMap.clear(); + } + + private void updateClientSubscription(String subscriptionName, ClientMetricsConfigs configs) { + List metrics = configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS); + int pushInterval = configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS); + List clientMatchPattern = configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN); + + SubscriptionInfo newSubscription = + new SubscriptionInfo(subscriptionName, metrics, pushInterval, + ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern)); + + subscriptionMap.put(subscriptionName, newSubscription); + } + + private Uuid generateNewClientId() { + Uuid id = Uuid.randomUuid(); + while (clientInstanceCache.get(id) != null) { + id = Uuid.randomUuid(); + } + return id; + } + + private ClientMetricsInstance clientInstance(Uuid clientInstanceId, RequestContext requestContext) { + ClientMetricsInstance clientInstance = clientInstanceCache.get(clientInstanceId); + + if (clientInstance == null) { + /* + If the client instance is not present in the cache, then create a new client instance + and update the cache. This can also happen when the telemetry request is received by + the separate broker instance. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance creation with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance != null) { + return clientInstance; + } + + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata( + clientInstanceId, requestContext); + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata); + } + } else if (clientInstance.subscriptionVersion() < subscriptionUpdateVersion.get()) { + /* + If the last subscription update version for client instance is older than the subscription + updated version, then re-evaluate the subscription information for the client as per the + updated subscriptions. This is to ensure that the client instance is always in sync with + the latest subscription information. + Though cache is synchronized, but it is possible that concurrent calls can create the same + client instance. Hence, safeguard the client instance update with a double-checked lock + to ensure that only one instance is created. + */ + synchronized (this) { + clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance.subscriptionVersion() >= subscriptionUpdateVersion.get()) { + return clientInstance; + } + clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, clientInstance.instanceMetadata()); + } + } + + return clientInstance; + } + + private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInstanceId, + ClientMetricsInstanceMetadata instanceMetadata) { + + ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); + clientInstanceCache.put(clientInstanceId, clientInstance); + return clientInstance; + } + + private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata) { + + int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS; + // Keep a set of metrics to avoid duplicates in case of overlapping subscriptions. + Set subscribedMetrics = new HashSet<>(); + boolean allMetricsSubscribed = false; + + int currentSubscriptionVersion = subscriptionUpdateVersion.get(); + for (SubscriptionInfo info : subscriptionMap.values()) { + if (instanceMetadata.isMatch(info.matchPattern())) { + allMetricsSubscribed = allMetricsSubscribed || info.metrics().contains( + ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + subscribedMetrics.addAll(info.metrics()); + pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs()); + } + } + + /* + If client matches with any subscription that has * metrics string, then it means that client + is subscribed to all the metrics, so just send the * string as the subscribed metrics. + */ + if (allMetricsSubscribed) { + // Only add an * to indicate that all metrics are subscribed. + subscribedMetrics.clear(); + subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + } + + int subscriptionId = computeSubscriptionId(subscribedMetrics, pushIntervalMs, clientInstanceId); + + return new ClientMetricsInstance(clientInstanceId, instanceMetadata, subscriptionId, + currentSubscriptionVersion, subscribedMetrics, pushIntervalMs); + } + + /** + * Computes the SubscriptionId as a unique identifier for a client instance's subscription set, + * the id is generated by calculating a CRC32C of the configured metrics subscriptions including + * the PushIntervalMs, XORed with the ClientInstanceId. + */ + private int computeSubscriptionId(Set metrics, int pushIntervalMs, Uuid clientInstanceId) { + byte[] metricsBytes = (metrics.toString() + pushIntervalMs).getBytes(StandardCharsets.UTF_8); + long computedCrc = Crc32C.compute(metricsBytes, 0, metricsBytes.length); + return (int) computedCrc ^ clientInstanceId.hashCode(); + } + + private GetTelemetrySubscriptionsResponse createGetSubscriptionResponse(Uuid clientInstanceId, + ClientMetricsInstance clientInstance) { + + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setClientInstanceId(clientInstanceId) + .setSubscriptionId(clientInstance.subscriptionId()) + .setRequestedMetrics(new ArrayList<>(clientInstance.metrics())) + .setAcceptedCompressionTypes(SUPPORTED_COMPRESSION_TYPES) + .setPushIntervalMs(clientInstance.pushIntervalMs()) + .setTelemetryMaxBytes(clientTelemetryMaxBytes) + .setDeltaTemporality(true) + .setErrorCode(Errors.NONE.code()); + + return new GetTelemetrySubscriptionsResponse(data); + } + + private void validateGetRequest(GetTelemetrySubscriptionsRequest request, + ClientMetricsInstance clientInstance, long timestamp) { + + if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID + || clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) { + String msg = String.format("Request from the client [%s] arrived before the next push interval time", + request.data().clientInstanceId()); + throw new ThrottlingQuotaExceededException(msg); + } + } + + private void validatePushRequest(PushTelemetryRequest request, ClientMetricsInstance clientInstance, long timestamp) { + + if (clientInstance.terminating()) { + String msg = String.format( + "Client [%s] sent the previous request with state terminating to TRUE, can not accept" + + "any requests after that", request.data().clientInstanceId()); + throw new InvalidRequestException(msg); + } + + if (!clientInstance.maybeUpdatePushRequestTimestamp(timestamp) && !request.data().terminating()) { + String msg = String.format("Request from the client [%s] arrived before the next push interval time", + request.data().clientInstanceId()); + throw new ThrottlingQuotaExceededException(msg); + } + + if (request.data().subscriptionId() != clientInstance.subscriptionId()) { + String msg = String.format("Unknown client subscription id for the client [%s]", + request.data().clientInstanceId()); + throw new UnknownSubscriptionIdException(msg); + } + + if (!isSupportedCompressionType(request.data().compressionType())) { + String msg = String.format("Unknown compression type [%s] is received in telemetry request from [%s]", + request.data().compressionType(), request.data().clientInstanceId()); + throw new UnsupportedCompressionTypeException(msg); + } + + if (request.data().metrics() != null && request.data().metrics().length > clientTelemetryMaxBytes) { + String msg = String.format("Telemetry request from [%s] is larger than the maximum allowed size [%s]", + request.data().clientInstanceId(), clientTelemetryMaxBytes); + throw new TelemetryTooLargeException(msg); + } + } + + private static boolean isSupportedCompressionType(int id) { + try { + CompressionType.forId(id); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + + // Visible for testing + SubscriptionInfo subscriptionInfo(String subscriptionName) { + return subscriptionMap.get(subscriptionName); + } + + // Visible for testing + Collection subscriptions() { + return Collections.unmodifiableCollection(subscriptionMap.values()); + } + + // Visible for testing + ClientMetricsInstance clientInstance(Uuid clientInstanceId) { + return clientInstanceCache.get(clientInstanceId); + } + + // Visible for testing + int subscriptionUpdateVersion() { + return subscriptionUpdateVersion.get(); + } + + public static class SubscriptionInfo { + + private final String name; + private final Set metrics; + private final int intervalMs; + private final Map matchPattern; + + public SubscriptionInfo(String name, List metrics, int intervalMs, + Map matchPattern) { + this.name = name; + this.metrics = new HashSet<>(metrics); + this.intervalMs = intervalMs; + this.matchPattern = matchPattern; + } + + public String name() { + return name; + } + + public Set metrics() { + return metrics; + } + + public int intervalMs() { + return intervalMs; + } + + public Map matchPattern() { + return matchPattern; + } } } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java index 737039fcbbf51..3f19291fc8327 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.metrics; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -23,6 +24,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -67,7 +69,7 @@ * For more information please look at kip-714: * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration */ -public class ClientMetricsConfigs { +public class ClientMetricsConfigs extends AbstractConfig { public static final String SUBSCRIPTION_METRICS = "metrics"; public static final String PUSH_INTERVAL_MS = "interval.ms"; @@ -80,6 +82,9 @@ public class ClientMetricsConfigs { public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; public static final String CLIENT_SOURCE_PORT = "client_source_port"; + // '*' in client-metrics resource configs indicates that all the metrics are subscribed. + public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "*"; + public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes private static final int MIN_INTERVAL_MS = 100; // 100ms private static final int MAX_INTERVAL_MS = 3600000; // 1 hour @@ -94,9 +99,13 @@ public class ClientMetricsConfigs { )); private static final ConfigDef CONFIG = new ConfigDef() - .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, "Subscription metrics list") - .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval in milliseconds") - .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client match pattern list"); + .define(SUBSCRIPTION_METRICS, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Subscription metrics list") + .define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, Importance.MEDIUM, "Push interval in milliseconds") + .define(CLIENT_MATCH_PATTERN, Type.LIST, Collections.emptyList(), Importance.MEDIUM, "Client match pattern list"); + + public ClientMetricsConfigs(Properties props) { + super(CONFIG, props); + } public static ConfigDef configDef() { return CONFIG; @@ -143,46 +152,43 @@ private static void validateProperties(Properties properties) { /** * Parses the client matching patterns and builds a map with entries that has * (PatternName, PatternValue) as the entries. - * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) + * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) *

- * NOTES: - * Client match pattern splits the input into two parts separated by first occurrence of the character '=' + * NOTES: + * Client match pattern splits the input into two parts separated by first occurrence of the character '=' * - * @param patterns List of client matching pattern strings + * @param patterns List of client matching pattern strings * @return map of client matching pattern entries */ - public static Map parseMatchingPatterns(List patterns) { - Map patternsMap = new HashMap<>(); - if (patterns != null) { - patterns.forEach(pattern -> { - String[] nameValuePair = pattern.split("="); - if (nameValuePair.length != 2) { - throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); - } - - String param = nameValuePair[0].trim(); - String patternValue = nameValuePair[1].trim(); - if (isValidParam(param) && isValidRegExPattern(patternValue)) { - patternsMap.put(param, patternValue); - } else { - throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); - } - }); + public static Map parseMatchingPatterns(List patterns) { + if (patterns == null || patterns.isEmpty()) { + return Collections.emptyMap(); } + Map patternsMap = new HashMap<>(); + patterns.forEach(pattern -> { + String[] nameValuePair = pattern.split("="); + if (nameValuePair.length != 2) { + throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); + } + + String param = nameValuePair[0].trim(); + if (!isValidParam(param)) { + throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); + } + + try { + Pattern patternValue = Pattern.compile(nameValuePair[1].trim()); + patternsMap.put(param, patternValue); + } catch (PatternSyntaxException e) { + throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); + } + }); + return patternsMap; } private static boolean isValidParam(String paramName) { return ALLOWED_MATCH_PARAMS.contains(paramName); } - - private static boolean isValidRegExPattern(String inputPattern) { - try { - Pattern.compile(inputPattern); - } catch (PatternSyntaxException e) { - return false; - } - return true; - } } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java new file mode 100644 index 0000000000000..74d499cf30f06 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + + private final Uuid clientInstanceId; + private final ClientMetricsInstanceMetadata instanceMetadata; + private final int subscriptionId; + private final int subscriptionVersion; + private final Set metrics; + private final int pushIntervalMs; + + private long lastGetRequestTimestamp; + private long lastPushRequestTimestamp; + private volatile boolean terminating; + private volatile Errors lastKnownError; + + public ClientMetricsInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata, + int subscriptionId, int subscriptionVersion, Set metrics, int pushIntervalMs) { + this.clientInstanceId = Objects.requireNonNull(clientInstanceId); + this.instanceMetadata = Objects.requireNonNull(instanceMetadata); + this.subscriptionId = subscriptionId; + this.subscriptionVersion = subscriptionVersion; + this.metrics = metrics; + this.terminating = false; + this.pushIntervalMs = pushIntervalMs; + this.lastKnownError = Errors.NONE; + } + + public Uuid clientInstanceId() { + return clientInstanceId; + } + + public ClientMetricsInstanceMetadata instanceMetadata() { + return instanceMetadata; + } + + public int pushIntervalMs() { + return pushIntervalMs; + } + + public int subscriptionId() { + return subscriptionId; + } + + public int subscriptionVersion() { + return subscriptionVersion; + } + + public Set metrics() { + return metrics; + } + + public boolean terminating() { + return terminating; + } + + public synchronized void terminating(boolean terminating) { + this.terminating = terminating; + } + + public Errors lastKnownError() { + return lastKnownError; + } + + public synchronized void lastKnownError(Errors lastKnownError) { + this.lastKnownError = lastKnownError; + } + + public synchronized boolean maybeUpdateGetRequestTimestamp(long currentTime) { + long lastRequestTimestamp = Math.max(lastGetRequestTimestamp, lastPushRequestTimestamp); + long timeElapsedSinceLastMsg = currentTime - lastRequestTimestamp; + if (timeElapsedSinceLastMsg >= pushIntervalMs) { + lastGetRequestTimestamp = currentTime; + return true; + } + return false; + } + + public synchronized boolean maybeUpdatePushRequestTimestamp(long currentTime) { + /* + Immediate push request after get subscriptions fetch can be accepted outside push interval + time as client applies a jitter to the push interval, which might result in a request being + sent between 0.5 * pushIntervalMs and 1.5 * pushIntervalMs. + */ + boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp; + if (!canAccept) { + long lastRequestTimestamp = Math.max(lastGetRequestTimestamp, lastPushRequestTimestamp); + long timeElapsedSinceLastMsg = currentTime - lastRequestTimestamp; + canAccept = timeElapsedSinceLastMsg >= pushIntervalMs; + } + + // Update the timestamp only if the request can be accepted. + if (canAccept) { + lastPushRequestTimestamp = currentTime; + } + return canAccept; + } +} diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java new file mode 100644 index 0000000000000..1ff687121d174 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.RequestContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; + +/** + * Information from the client's metadata is gathered from the client's request. + */ +public class ClientMetricsInstanceMetadata { + + private final Map attributesMap; + + public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext requestContext) { + Objects.requireNonNull(clientInstanceId); + Objects.requireNonNull(requestContext); + + attributesMap = new HashMap<>(); + + attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + attributesMap.put(ClientMetricsConfigs.CLIENT_ID, requestContext.clientId()); + attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, requestContext.clientInformation != null ? + requestContext.clientInformation.softwareName() : null); + attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, requestContext.clientInformation != null ? + requestContext.clientInformation.softwareVersion() : null); + attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, requestContext.clientAddress != null ? + requestContext.clientAddress.getHostAddress() : null); + // KIP-714 mentions client source port should be the client connection's source port from the + // broker's point of view. But the broker does not have this information rather the port could be + // the broker's port where the client connection is established. We might want to consider removing + // the client source port from the KIP or use broker port if that can be helpful. + // TODO: fix port + attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_PORT, requestContext.clientAddress != null ? + requestContext.clientAddress.getHostAddress() : null); + } + + public boolean isMatch(Map patterns) { + if (!patterns.isEmpty()) { + return matchPatterns(patterns); + } + // Empty pattern is still considered as a match. + return true; + } + + private boolean matchPatterns(Map matchingPatterns) { + return matchingPatterns.entrySet().stream() + .allMatch(entry -> { + String attribute = attributesMap.get(entry.getKey()); + return attribute != null && entry.getValue().matcher(attribute).matches(); + }); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java new file mode 100644 index 0000000000000..7108155b07ed8 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + + private final List receivers; + + public ClientMetricsReceiverPlugin() { + this.receivers = new ArrayList<>(); + } + + public boolean isEmpty() { + return receivers.isEmpty(); + } + + public void add(ClientTelemetryReceiver receiver) { + receivers.add(receiver); + } + + public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest request) { + return new DefaultClientTelemetryPayload(request); + } + + public void exportMetrics(RequestContext context, PushTelemetryRequest request) { + DefaultClientTelemetryPayload payload = getPayLoad(request); + for (ClientTelemetryReceiver receiver : receivers) { + receiver.exportMetrics(context, payload); + } + } +} diff --git a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java new file mode 100644 index 0000000000000..f7c50515228fa --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.telemetry.ClientTelemetryPayload; + +import java.nio.ByteBuffer; + +/** + * Implements the {@code ClientTelemetryPayload} interface for the metrics payload sent by the client. + */ +public class DefaultClientTelemetryPayload implements ClientTelemetryPayload { + + final private Uuid clientInstanceId; + final private boolean isClientTerminating; + final private String metricsContentType; + final private ByteBuffer metricsData; + + DefaultClientTelemetryPayload(PushTelemetryRequest request) { + this.clientInstanceId = request.data().clientInstanceId(); + this.isClientTerminating = request.data().terminating(); + this.metricsContentType = request.metricsContentType(); + this.metricsData = request.metricsData(); + } + + @Override + public Uuid clientInstanceId() { + return this.clientInstanceId; + } + + @Override + public boolean isTerminating() { + return isClientTerminating; + } + + @Override + public String contentType() { + return metricsContentType; + } + + @Override + public ByteBuffer data() { + return metricsData; + } +} diff --git a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java new file mode 100644 index 0000000000000..f1ba03d6e5240 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.metrics.ClientMetricsConfigs; +import org.apache.kafka.server.metrics.ClientMetricsInstance; +import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + + private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsManagerTest.class); + + private MockTime time; + private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin; + private ClientMetricsManager clientMetricsManager; + + @BeforeEach + public void setUp() { + time = new MockTime(); + clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin(); + clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + } + + @Test + public void testUpdateSubscription() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + + assertEquals(1, clientMetricsManager.subscriptions().size()); + assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + + ClientMetricsManager.SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); + Set metrics = subscriptionInfo.metrics(); + + // Validate metrics. + assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> + assertTrue(metrics.contains(metric))); + // Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), + String.valueOf(subscriptionInfo.intervalMs())); + + // Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), + subscriptionInfo.matchPattern().size()); + ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { + String[] split = pattern.split("="); + assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); + assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); + }); + assertEquals(1, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithEmptyProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + clientMetricsManager.updateSubscription("sub-1", new Properties()); + // No subscription should be added as the properties are empty. + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithNullProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + // Properties shouldn't be passed as null. + assertThrows(NullPointerException.class, () -> + clientMetricsManager.updateSubscription("sub-1", null)); + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testUpdateSubscriptionWithInvalidMetricsProperties() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + Properties properties = new Properties(); + properties.put("random", "random"); + assertThrows(InvalidRequestException.class, () -> clientMetricsManager.updateSubscription("sub-1", properties)); + } + + @Test + public void testUpdateSubscriptionWithPropertiesDeletion() { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); + + Properties properties = new Properties(); + properties.put("interval.ms", "100"); + clientMetricsManager.updateSubscription("sub-1", properties); + assertEquals(1, clientMetricsManager.subscriptions().size()); + assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + assertEquals(1, clientMetricsManager.subscriptionUpdateVersion()); + + clientMetricsManager.updateSubscription("sub-1", new Properties()); + // Subscription should be removed as all properties are removed. + assertEquals(0, clientMetricsManager.subscriptions().size()); + assertEquals(2, clientMetricsManager.subscriptionUpdateVersion()); + } + + @Test + public void testGetTelemetry() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + + assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, response.data().requestedMetrics().size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> + assertTrue(response.data().requestedMetrics().contains(metric))); + + assertEquals(4, response.data().acceptedCompressionTypes().size()); + // validate compression types order. + assertEquals(CompressionType.ZSTD.id, response.data().acceptedCompressionTypes().get(0)); + assertEquals(CompressionType.LZ4.id, response.data().acceptedCompressionTypes().get(1)); + assertEquals(CompressionType.GZIP.id, response.data().acceptedCompressionTypes().get(2)); + assertEquals(CompressionType.SNAPPY.id, response.data().acceptedCompressionTypes().get(3)); + assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetryWithoutSubscription() throws UnknownHostException { + assertTrue(clientMetricsManager.subscriptions().isEmpty()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + assertTrue(response.data().requestedMetrics().isEmpty()); + assertEquals(4, response.data().acceptedCompressionTypes().size()); + assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetryAfterPushIntervalTime() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertNotNull(response.data().clientInstanceId()); + assertEquals(Errors.NONE, response.error()); + + time.sleep(ClientMetricsConfigs.DEFAULT_INTERVAL_MS); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()), true).build(); + + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + assertNotNull(response.data().clientInstanceId()); + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testGetTelemetryAllMetricSubscribedSubscription() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + Properties properties = new Properties(); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + clientMetricsManager.updateSubscription("sub-2", properties); + + assertEquals(2, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertNotNull(response.data().clientInstanceId()); + assertTrue(response.data().subscriptionId() != 0); + + assertEquals(1, response.data().requestedMetrics().size()); + assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG)); + + assertEquals(4, response.data().acceptedCompressionTypes().size()); + assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, response.data().pushIntervalMs()); + assertTrue(response.data().deltaTemporality()); + assertEquals(100, response.data().telemetryMaxBytes()); + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(response.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testGetTelemetrySameClientImmediateRetryFail() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + Uuid clientInstanceId = response.data().clientInstanceId(); + assertNotNull(clientInstanceId); + assertEquals(Errors.NONE, response.error()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error()); + } + + @Test + public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + Uuid clientInstanceId = response.data().clientInstanceId(); + assertNotNull(clientInstanceId); + assertEquals(Errors.NONE, response.error()); + + // Create new client metrics manager which simulates a new server as it will not have any + // last request information but request should succeed as subscription id should match + // the one with new client instance. + + ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + + PushTelemetryRequest pushRequest = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(response.data().clientInstanceId()) + .setSubscriptionId(response.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse pushResponse = newClientMetricsManager.processPushTelemetryRequest( + pushRequest, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, pushResponse.error()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + + response = newClientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error()); + } + + @Test + public void testGetTelemetryUpdateSubscription() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + Uuid clientInstanceId = response.data().clientInstanceId(); + int subscriptionId = response.data().subscriptionId(); + assertNotNull(clientInstanceId); + assertTrue(subscriptionId != 0); + assertEquals(Errors.NONE, response.error()); + + // Update subscription + Properties properties = new Properties(); + properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); + clientMetricsManager.updateSubscription("sub-2", properties); + assertEquals(2, clientMetricsManager.subscriptions().size()); + + request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build(); + + response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + // No throttle error as the subscription has changed. + assertEquals(Errors.NONE, response.error()); + // Subscription id updated in next request + assertTrue(subscriptionId != response.data().subscriptionId()); + } + + @Test + public void testGetTelemetryConcurrentRequestNewClientInstance() throws InterruptedException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build(); + + CountDownLatch lock = new CountDownLatch(2); + List responses = Collections.synchronizedList(new ArrayList<>()); + + Thread thread = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (GetTelemetrySubscriptionsResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updated hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.NONE, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } + + @Test + public void testGetTelemetryConcurrentRequestAfterSubscriptionUpdate() + throws InterruptedException, UnknownHostException { + GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.randomUuid()), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + CountDownLatch lock = new CountDownLatch(2); + List responses = Collections.synchronizedList(new ArrayList<>()); + + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + Thread thread = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( + request, ClientMetricsTestUtils.requestContext()); + + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (GetTelemetrySubscriptionsResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updated hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.NONE, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } + + @Test + public void testPushTelemetry() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryOnNewServer() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + // Create new client metrics manager which simulates a new server as it will not have any + // client instance information but request should succeed as subscription id should match + // the one with new client instance. + + ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()), true).build(); + + PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testPushTelemetryAfterPushIntervalTime() throws UnknownHostException { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + + time.sleep(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS); + + response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + } + + @Test + public void testPushTelemetryClientInstanceIdInvalid() throws UnknownHostException { + // Null client instance id + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData().setClientInstanceId(null), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.INVALID_REQUEST, response.error()); + + // Zero client instance id + request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build(); + + response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.INVALID_REQUEST, response.error()); + } + + @Test + public void testPushTelemetryThrottleError() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + // Immediate push request should succeed. + assertEquals(Errors.NONE, response.error()); + + response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + // Second push request should fail with throttle error. + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + assertFalse(instance.terminating()); + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryTerminatingFlag() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + + // Push telemetry with terminating flag set to true. + request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setTerminating(true), true).build(); + + response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + assertTrue(instance.terminating()); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryNextRequestPostTerminatingFlag() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setTerminating(true), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.NONE, response.error()); + assertTrue(instance.terminating()); + assertEquals(Errors.NONE, instance.lastKnownError()); + + request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setTerminating(true), true).build(); + + response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.INVALID_REQUEST, response.error()); + assertTrue(instance.terminating()); + assertEquals(Errors.INVALID_REQUEST, instance.lastKnownError()); + } + + @Test + public void testPushTelemetrySubscriptionIdInvalid() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(1234), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryCompressionTypeInvalid() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType((byte) 100), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryNullMetricsData() throws UnknownHostException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setMetrics(null), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + // Should not report any error though no metrics will be exported. + assertEquals(Errors.NONE, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.NONE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryMetricsTooLarge() throws UnknownHostException { + clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8); + assertEquals(2, metrics.length); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setMetrics(metrics), true).build(); + + // Set the max bytes 1 to force the error. + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + // Should not report any error though no metrics will be exported. + assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error()); + assertFalse(instance.terminating()); + assertEquals(Errors.TELEMETRY_TOO_LARGE, instance.lastKnownError()); + } + + @Test + public void testPushTelemetryConcurrentRequestNewClientInstance() throws UnknownHostException, InterruptedException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + CountDownLatch lock = new CountDownLatch(2); + List responses = Collections.synchronizedList(new ArrayList<>()); + + ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + + Thread thread = new Thread(() -> { + try { + PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + PushTelemetryResponse response = newClientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (PushTelemetryResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updates hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.NONE, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } + + @Test + public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws UnknownHostException, InterruptedException { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build(); + + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + CountDownLatch lock = new CountDownLatch(2); + List responses = Collections.synchronizedList(new ArrayList<>()); + + Thread thread = new Thread(() -> { + try { + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + Thread thread1 = new Thread(() -> { + try { + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + responses.add(response); + } catch (UnknownHostException e) { + LOG.error("Error processing request", e); + } finally { + lock.countDown(); + } + }); + + thread.start(); + thread1.start(); + + assertTrue(lock.await(2000, TimeUnit.MILLISECONDS)); + assertEquals(2, responses.size()); + + int throttlingErrorCount = 0; + for (PushTelemetryResponse response : responses) { + if (response.error() == Errors.THROTTLING_QUOTA_EXCEEDED) { + throttlingErrorCount++; + } else { + // As subscription is updated hence 1 request shall fail with unknown subscription id. + assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error()); + } + } + // 1 request should fail with throttling error. + assertEquals(1, throttlingErrorCount); + } +} diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java new file mode 100644 index 0000000000000..46f6c43e82286 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceMetadataTest { + + @Test + public void testIsMatchValid() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, ClientMetricsTestUtils.requestContext()); + // We consider empty/missing client matching patterns as valid + assertTrue(instanceMetadata.isMatch(Collections.emptyMap())); + + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile(".*")))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-1")))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer.*")))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString())))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-java")))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2")))); + assertTrue(instanceMetadata.isMatch( + Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile( + InetAddress.getLocalHost().getHostAddress())))); + } + + @Test + public void testIsMatchMultiplePatternValid() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, + ClientMetricsTestUtils.requestContext()); + + Map patternMap = new HashMap<>(); + patternMap.put(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-1")); + patternMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString())); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-.*")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(InetAddress.getLocalHost().getHostAddress())); + + assertTrue(instanceMetadata.isMatch(patternMap)); + } + + @Test + public void testIsMatchMismatchFail() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, + ClientMetricsTestUtils.requestContext()); + + Map patternMap = new HashMap<>(); + patternMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString())); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-.*")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(InetAddress.getLocalHost().getHostAddress())); + + // Client id is different. + patternMap.put(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-2")); + assertFalse(instanceMetadata.isMatch(patternMap)); + + // Client instance id is different. + patternMap.put(ClientMetricsConfigs.CLIENT_ID, Pattern.compile("producer-1")); + patternMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid + "random")); + assertFalse(instanceMetadata.isMatch(patternMap)); + + // Software name is different. + patternMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, Pattern.compile(uuid.toString())); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-java-1")); + assertFalse(instanceMetadata.isMatch(patternMap)); + + // Software version is different. + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, Pattern.compile("apache-kafka-java")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.x")); + assertFalse(instanceMetadata.isMatch(patternMap)); + + // Source address is different. + patternMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, Pattern.compile("3.5.2")); + patternMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile("1.2.3.4")); + assertFalse(instanceMetadata.isMatch(patternMap)); + } + + @Test + public void testIsMatchWithInvalidKeyFail() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, + ClientMetricsTestUtils.requestContext()); + + // Unknown key in pattern map + assertFalse(instanceMetadata.isMatch(Collections.singletonMap("unknown", Pattern.compile(".*")))); + // '*' key is considered as invalid regex pattern + assertFalse(instanceMetadata.isMatch(Collections.singletonMap("*", Pattern.compile(".*")))); + } + + @Test + public void testIsMatchWithNullValueFail() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, + ClientMetricsTestUtils.requestContextWithNullClientInfo()); + + assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, + Pattern.compile(".*")))); + assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, + Pattern.compile(".*")))); + } +} diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java new file mode 100644 index 0000000000000..c5109470dae49 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + + private ClientMetricsInstance clientInstance; + + @BeforeEach + public void setUp() throws UnknownHostException { + Uuid uuid = Uuid.randomUuid(); + ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, + ClientMetricsTestUtils.requestContext()); + clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); + } + + @Test + public void testMaybeUpdateRequestTimestampValid() { + // First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); + } + + @Test + public void testMaybeUpdateGetRequestAfterElapsedTimeValid() { + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + // Second request should be accepted as time since last request is greater than the push interval. + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); + } + + @Test + public void testMaybeUpdateGetRequestWithImmediateRetryFail() { + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); + // Second request should be rejected as time since last request is less than the push interval. + assertFalse(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); + } + + @Test + public void testMaybeUpdatePushRequestAfterElapsedTimeValid() { + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + // Second request should be accepted as time since last request is greater than the push interval. + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); + } + + @Test + public void testMaybeUpdateGetRequestWithImmediateRetryAfterPushFail() { + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); + // Next request after push should be rejected as time since last request is less than the push interval. + assertFalse(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis() + 1)); + } + + @Test + public void testMaybeUpdatePushRequestWithImmediateRetryFail() { + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); + // Second request should be rejected as time since last request is less than the push interval. + assertFalse(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis())); + } + + @Test + public void testMaybeUpdatePushRequestWithImmediateRetryAfterGetValid() { + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() - ClientMetricsConfigs.DEFAULT_INTERVAL_MS)); + assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis())); + // Next request after get should be accepted. + assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis() + 1)); + } +} diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java new file mode 100644 index 0000000000000..7ac6fcf23d190 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsReceiverPluginTest { + + private TestClientMetricsReceiver telemetryReceiver; + private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin; + + @BeforeEach + public void setUp() { + telemetryReceiver = new TestClientMetricsReceiver(); + clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin(); + } + + @Test + public void testExportMetrics() throws UnknownHostException { + assertTrue(clientMetricsReceiverPlugin.isEmpty()); + + clientMetricsReceiverPlugin.add(telemetryReceiver); + assertFalse(clientMetricsReceiverPlugin.isEmpty()); + + assertEquals(0, telemetryReceiver.exportMetricsInvokedCount); + assertTrue(telemetryReceiver.metricsData.isEmpty()); + + byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8); + clientMetricsReceiverPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(metrics), true).build()); + + assertEquals(1, telemetryReceiver.exportMetricsInvokedCount); + assertEquals(1, telemetryReceiver.metricsData.size()); + assertEquals(metrics, telemetryReceiver.metricsData.get(0).array()); + } +} diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java index f36608d9500a9..4a5ee4ac41971 100644 --- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java @@ -16,6 +16,21 @@ */ package org.apache.kafka.server.metrics; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryPayload; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -38,4 +53,38 @@ public static Properties defaultProperties() { props.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, String.join(",", DEFAULT_CLIENT_MATCH_PATTERNS)); return props; } + + public static RequestContext requestContext() throws UnknownHostException { + return new RequestContext( + new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0), + "1", + InetAddress.getLocalHost(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + new ClientInformation("apache-kafka-java", "3.5.2"), + false); + } + + public static RequestContext requestContextWithNullClientInfo() throws UnknownHostException { + return new RequestContext( + new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0), + "1", + InetAddress.getLocalHost(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + null, + false); + } + + public static class TestClientMetricsReceiver implements ClientTelemetryReceiver { + public int exportMetricsInvokedCount = 0; + public List metricsData = new ArrayList<>(); + + public void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload) { + exportMetricsInvokedCount += 1; + metricsData.add(payload.data()); + } + } }