Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config for custom metrics tags (5.3.x) #2996

Merged
merged 2 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/installation/server-config/config-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ The corresponding environment variable in the
`KSQL Server image <https://hub.docker.com/r/confluentinc/cp-ksql-server/>`__ is
``KSQL_LISTENERS``.

.. _ksql-metrics-tags-custom:

------------------------
ksql.metrics.tags.custom
------------------------

A list of tags to be included with emitted :ref:`JMX metrics <ksql-monitoring-and-metrics>`,
formatted as a string of ``key:value`` pairs separated by commas.
For example, ``key1:value1,key2:value2``.

.. _ksql-c3-settings:

|c3| Settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
Expand Down Expand Up @@ -143,6 +144,11 @@ public class KsqlConfig extends AbstractConfig {
private static final String
defaultSchemaRegistryUrl = "http://localhost:8081";

public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
Expand Down Expand Up @@ -447,6 +453,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
).define(
KSQL_CUSTOM_METRICS_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down Expand Up @@ -684,6 +696,22 @@ public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map<String, St
return new KsqlConfig(ConfigGeneration.LEGACY, mergedProperties, mergedStreamConfigProps);
}

public Map<String, String> getStringAsMap(final String key) {
final String value = getString(key).trim();
try {
return value.equals("")
? Collections.emptyMap()
: Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
} catch (IllegalArgumentException e) {
throw new KsqlException(
String.format(
"Invalid config value for '%s'. value: %s. reason: %s",
key,
value,
e.getMessage()));
}
}

private static Set<String> sslConfigNames() {
final ConfigDef sslConfig = new ConfigDef();
SslConfigs.addClientSslSupport(sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static KsqlContext create(
final ServiceContext serviceContext = DefaultServiceContext.create(ksqlConfig);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceId);
serviceInfo);

return new KsqlContext(
serviceContext,
Expand Down
55 changes: 55 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql;

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
*/
public static ServiceInfo create(final KsqlConfig ksqlConfig) {
Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);

return new ServiceInfo(serviceId, customMetricsTags);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
}

public String serviceId() {
return serviceId;
}

public Map<String, String> customMetricsTags() {
return customMetricsTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -67,14 +68,14 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final String serviceId
final ServiceInfo serviceInfo
) {
this(
serviceContext,
processingLogContext,
serviceId,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
KsqlEngineMetrics::new);
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -55,24 +57,28 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;

private final String ksqlServiceId;

private final Map<String, String> customMetricsTags;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics) {
final Metrics metrics,
final Map<String, String> customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;

this.metrics = metrics;

Expand Down Expand Up @@ -269,7 +275,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}

Expand All @@ -293,6 +300,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
final KafkaStreams.State state
) {
final Gauge<Long> gauge =
Expand All @@ -302,7 +310,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description);
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
Expand All @@ -315,15 +323,17 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ServiceInfo.create(ksqlConfig)
);

return new KsqlContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -48,7 +50,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
KsqlEngineMetrics::new
(engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void before() throws Exception {
serviceContext,
processingLogContext,
new InternalFunctionRegistry(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
Expand All @@ -37,6 +38,7 @@

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -269,7 +271,7 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

execInitCreateStreamQueries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class KsqlEngineMetricsTest {
private KsqlEngineMetrics engineMetrics;
private static final String KSQL_SERVICE_ID = "test-ksql-service-id";
private static final String metricNamePrefix = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + KSQL_SERVICE_ID;
private static final Map<String, String> CUSTOM_TAGS = ImmutableMap.of("tag1", "value1", "tag2", "value2");

@Mock
private KsqlEngine ksqlEngine;
Expand All @@ -75,7 +76,7 @@ public void setUp() {
when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID);
when(query1.getQueryApplicationId()).thenReturn("app-1");

engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics());
engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics(), CUSTOM_TAGS);
}

@After
Expand All @@ -93,15 +94,6 @@ public void shouldRemoveAllSensorsOnClose() {
engineMetrics.registeredSensors().forEach(sensor -> assertThat(engineMetrics.getMetrics().getSensor(sensor.name()), is(nullValue())));
}

private void shouldRecordRate(final String name, final double expected, final double error) {
assertThat(
Math.floor(getMetricValueLegacy(name)),
closeTo(expected, error));
assertThat(
Math.floor(getMetricValue(name)),
closeTo(expected, error));
}

@Test
public void shouldRecordNumberOfActiveQueries() {
when(ksqlEngine.numberOfLiveQueries()).thenReturn(3);
Expand Down Expand Up @@ -264,7 +256,7 @@ private double getMetricValue(final String metricName) {
return Double.valueOf(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
Expand All @@ -274,7 +266,7 @@ private long getLongMetricValue(final String metricName) {
return Long.parseLong(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", CUSTOM_TAGS)
).metricValue().toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
Expand Down Expand Up @@ -468,7 +469,7 @@ static KsqlRestApplication buildApplication(
serviceContext,
processingLogContext,
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

Expand Down
Loading