diff --git a/docs/installation/server-config/config-reference.rst b/docs/installation/server-config/config-reference.rst
index 0a5a2311deb..a35c742be35 100644
--- a/docs/installation/server-config/config-reference.rst
+++ b/docs/installation/server-config/config-reference.rst
@@ -317,6 +317,16 @@ The corresponding environment variable in the
`KSQL Server image `__ is
``KSQL_LISTENERS``.
+.. _ksql-metrics-tags-custom:
+
+------------------------
+ksql.metrics.tags.custom
+------------------------
+
+A list of tags to be included with emitted :ref:`JMX metrics `,
+formatted as a string of ``key:value`` pairs separated by commas.
+For example, ``key1:value1,key2:value2``.
+
.. _ksql-c3-settings:
|c3| Settings
diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
index 23b5ee3fdc7..ecb27616b98 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
@@ -15,6 +15,7 @@
package io.confluent.ksql.util;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
@@ -143,6 +144,11 @@ public class KsqlConfig extends AbstractConfig {
private static final String
defaultSchemaRegistryUrl = "http://localhost:8081";
+ public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
+ private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
+ "A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ + "pairs separated by commas. For example, 'key1:value1,key2:value2'.";
+
public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";
public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
@@ -447,6 +453,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
+ ).define(
+ KSQL_CUSTOM_METRICS_TAGS,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.LOW,
+ KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
@@ -684,6 +696,22 @@ public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map getStringAsMap(final String key) {
+ final String value = getString(key).trim();
+ try {
+ return value.equals("")
+ ? Collections.emptyMap()
+ : Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
+ } catch (IllegalArgumentException e) {
+ throw new KsqlException(
+ String.format(
+ "Invalid config value for '%s'. value: %s. reason: %s",
+ key,
+ value,
+ e.getMessage()));
+ }
+ }
+
private static Set sslConfigNames() {
final ConfigDef sslConfig = new ConfigDef();
SslConfigs.addClientSslSupport(sslConfig);
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
index b69502ff0c3..baa8288b930 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
@@ -66,12 +66,12 @@ public static KsqlContext create(
final ServiceContext serviceContext = DefaultServiceContext.create(ksqlConfig);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
- final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
+ final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
- serviceId);
+ serviceInfo);
return new KsqlContext(
serviceContext,
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java b/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
new file mode 100644
index 00000000000..45bf15d34d4
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql;
+
+import io.confluent.ksql.util.KsqlConfig;
+import java.util.Map;
+import java.util.Objects;
+
+public final class ServiceInfo {
+
+ private final String serviceId;
+ private final Map customMetricsTags;
+
+ /**
+ * Create an object to be passed from the KSQL context down to the KSQL engine.
+ */
+ public static ServiceInfo create(final KsqlConfig ksqlConfig) {
+ Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");
+
+ final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
+ final Map customMetricsTags =
+ ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);
+
+ return new ServiceInfo(serviceId, customMetricsTags);
+ }
+
+ private ServiceInfo(
+ final String serviceId,
+ final Map customMetricsTags
+ ) {
+ this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
+ this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
+ }
+
+ public String serviceId() {
+ return serviceId;
+ }
+
+ public Map customMetricsTags() {
+ return customMetricsTags;
+ }
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java
index 2f71c26c8ed..282896ef457 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/KsqlEngine.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
+import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
@@ -67,14 +68,14 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
- final String serviceId
+ final ServiceInfo serviceInfo
) {
this(
serviceContext,
processingLogContext,
- serviceId,
+ serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
- KsqlEngineMetrics::new);
+ (engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
}
KsqlEngine(
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java
index 77f2d82ac8c..227499a9a4f 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java
@@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
@@ -55,24 +57,28 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;
private final String ksqlServiceId;
-
+ private final Map customMetricsTags;
private final KsqlEngine ksqlEngine;
private final Metrics metrics;
- public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
- this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
+ public KsqlEngineMetrics(
+ final KsqlEngine ksqlEngine,
+ final Map customMetricsTags) {
+ this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}
KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
- final Metrics metrics) {
+ final Metrics metrics,
+ final Map customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
+ this.customMetricsTags = customMetricsTags;
this.metrics = metrics;
@@ -269,7 +275,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
- metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
+ metrics.metricName(
+ metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}
@@ -293,6 +300,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
+ final Map tags,
final KafkaStreams.State state
) {
final Gauge gauge =
@@ -302,7 +310,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
- final MetricName metricName = metrics.metricName(name, group, description);
+ final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
@@ -315,8 +323,9 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
- ksqlServiceId + metricGroupName + "-" + name,
+ ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
+ Collections.emptyMap(),
state
);
// new
@@ -324,6 +333,7 @@ private void configureNumActiveQueriesForGivenState(
metrics,
name,
ksqlServiceId + metricGroupName,
+ customMetricsTags,
state
);
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
index fab1f18a0da..ce7305d0058 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
@@ -57,7 +57,7 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
- ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
+ ServiceInfo.create(ksqlConfig)
);
return new KsqlContext(
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java
index 89e75bbf378..6dabccd66c6 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java
@@ -29,6 +29,8 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryMetadata;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -48,7 +50,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
- KsqlEngineMetrics::new
+ (engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap())
);
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java
index bcaba97a685..c6c319fd147 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java
@@ -21,6 +21,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlConfigTestUtil;
+import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
@@ -96,7 +97,7 @@ public void before() throws Exception {
serviceContext,
processingLogContext,
new InternalFunctionRegistry(),
- ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
+ ServiceInfo.create(ksqlConfig));
topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java
index 2a6547a6b84..f39834ef404 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java
@@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
+import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
@@ -37,6 +38,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.KsqlConfigTestUtil;
+import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
@@ -269,7 +271,7 @@ private void givenTestSetupWithConfig(final Map ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
- ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
+ ServiceInfo.create(ksqlConfig));
execInitCreateStreamQueries();
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java
index fe5d89a2cd0..5fe2f82a9b5 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java
@@ -63,6 +63,7 @@ public class KsqlEngineMetricsTest {
private KsqlEngineMetrics engineMetrics;
private static final String KSQL_SERVICE_ID = "test-ksql-service-id";
private static final String metricNamePrefix = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + KSQL_SERVICE_ID;
+ private static final Map CUSTOM_TAGS = ImmutableMap.of("tag1", "value1", "tag2", "value2");
@Mock
private KsqlEngine ksqlEngine;
@@ -75,7 +76,7 @@ public void setUp() {
when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID);
when(query1.getQueryApplicationId()).thenReturn("app-1");
- engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics());
+ engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics(), CUSTOM_TAGS);
}
@After
@@ -93,15 +94,6 @@ public void shouldRemoveAllSensorsOnClose() {
engineMetrics.registeredSensors().forEach(sensor -> assertThat(engineMetrics.getMetrics().getSensor(sensor.name()), is(nullValue())));
}
- private void shouldRecordRate(final String name, final double expected, final double error) {
- assertThat(
- Math.floor(getMetricValueLegacy(name)),
- closeTo(expected, error));
- assertThat(
- Math.floor(getMetricValue(name)),
- closeTo(expected, error));
- }
-
@Test
public void shouldRecordNumberOfActiveQueries() {
when(ksqlEngine.numberOfLiveQueries()).thenReturn(3);
@@ -264,7 +256,7 @@ private double getMetricValue(final String metricName) {
return Double.valueOf(
metrics.metric(
metrics.metricName(
- metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
+ metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
@@ -274,7 +266,7 @@ private long getLongMetricValue(final String metricName) {
return Long.parseLong(
metrics.metric(
metrics.metricName(
- metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
+ metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
index 71908fade75..37a1c0dcf6a 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
@@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
@@ -468,7 +469,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
processingLogContext,
functionRegistry,
- ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
+ ServiceInfo.create(ksqlConfig));
UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java
index 3bdaec81278..9c051e09ca9 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java
@@ -17,6 +17,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.KsqlExecutionContext;
+import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.MutableFunctionRegistry;
@@ -111,7 +112,7 @@ static StandaloneExecutor create(
serviceContext,
processingLogContext,
functionRegistry,
- ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
+ ServiceInfo.create(ksqlConfig));
final UdfLoader udfLoader =
UdfLoader.newInstance(ksqlConfig, functionRegistry, installDir);