Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config for custom metrics tags #2971

Merged
merged 5 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ KSQL 5.4.0 includes new features, including:
* UDAFs support STRUCTs as parameters and return values.

* KSQL now supports working with source data where the value is an anonymous Avro or JSON serialized
`ARRAY`, `MAP` or primitive type, for example `STRING` or `BIGINT`. Previously KSQL required all
``ARRAY``, ``MAP`` or primitive type, for example ``STRING`` or ``BIGINT``. Previously KSQL required all
Avro values to be Avro records, and all JSON values to be JSON objects.
For more information, refer to :ref:`ksql_single_field_wrapping`.

Expand All @@ -18,6 +18,9 @@ KSQL 5.4.0 includes new features, including:
Avro record or JSON object, depending on the format in use, or as an anonymous value.
For more information, refer to :ref:`ksql_single_field_wrapping`.

* A new config ``ksql.metrics.tags.custom`` for adding custom tags to emitted JMX metrics.
See :ref:`ksql-metrics-tags-custom` for usage.

KSQL 5.4.0 includes the following misc. changes:

* Require either the value for a ``@UdfParameter`` or for the UDF JAR to be compiled with
Expand Down
10 changes: 10 additions & 0 deletions docs/installation/server-config/config-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,16 @@ The corresponding environment variable in the
`KSQL Server image <https://hub.docker.com/r/confluentinc/cp-ksql-server/>`__ is
``KSQL_LISTENERS``.

.. _ksql-metrics-tags-custom:

------------------------
ksql.metrics.tags.custom
------------------------

A list of tags to be included with emitted :ref:`JMX metrics <ksql-monitoring-and-metrics>`,
formatted as a string of ``key:value`` pairs separated by commas.
For example, ``key1:value1,key2:value2``.

.. _ksql-c3-settings:

|c3| Settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
Expand Down Expand Up @@ -143,6 +144,11 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_WRAP_SINGLE_VALUES =
"ksql.persistence.wrap.single.values";

public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String
defaultSchemaRegistryUrl = "http://localhost:8081";

Expand Down Expand Up @@ -470,6 +476,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
+ "e.g. '{\"FOO\": 10}." + System.lineSeparator()
+ "Note: the DELIMITED format ignores this setting as it does not support the "
+ "concept of a STRUCT, record or object."
).define(
KSQL_CUSTOM_METRICS_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down Expand Up @@ -707,6 +719,22 @@ public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map<String, St
return new KsqlConfig(ConfigGeneration.LEGACY, mergedProperties, mergedStreamConfigProps);
}

public Map<String, String> getStringAsMap(final String key) {
final String value = getString(key).trim();
try {
return value.equals("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for the NULL value as well. I think if you set a config like config= without a value, then the value returned will be null. I recalled seen this null value in some tests with configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I tried setting ksql.metrics.tags.custom= just now and got the empty string. Are you able to reproduce getting a null value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it was a different configuration that has NULL as default.

? Collections.emptyMap()
: Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
} catch (IllegalArgumentException e) {
throw new KsqlException(
String.format(
"Invalid config value for '%s'. value: %s. reason: %s",
key,
value,
e.getMessage()));
}
}

private static Set<String> sslConfigNames() {
final ConfigDef sslConfig = new ConfigDef();
SslConfigs.addClientSslSupport(sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public static KsqlContext create(
final ServiceContext serviceContext = DefaultServiceContext.create(ksqlConfig);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceId);
serviceInfo);

return new KsqlContext(
serviceContext,
Expand Down
55 changes: 55 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql;

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
*/
public static ServiceInfo create(final KsqlConfig ksqlConfig) {
Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);

return new ServiceInfo(serviceId, customMetricsTags);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
}

public String serviceId() {
return serviceId;
}

public Map<String, String> customMetricsTags() {
return customMetricsTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand Down Expand Up @@ -66,14 +67,14 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final String serviceId
final ServiceInfo serviceInfo
) {
this(
serviceContext,
processingLogContext,
serviceId,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
KsqlEngineMetrics::new);
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -55,24 +57,28 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;

private final String ksqlServiceId;

private final Map<String, String> customMetricsTags;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics) {
final Metrics metrics,
final Map<String, String> customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;

this.metrics = metrics;

Expand Down Expand Up @@ -292,7 +298,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}

Expand All @@ -316,6 +323,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
final KafkaStreams.State state
) {
final Gauge<Long> gauge =
Expand All @@ -325,7 +333,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description);
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
Expand All @@ -338,15 +346,17 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ServiceInfo.create(ksqlConfig)
);

return new KsqlContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -48,7 +50,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
KsqlEngineMetrics::new
(engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void before() throws Exception {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
Expand All @@ -37,6 +38,7 @@

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -269,7 +271,7 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

execInitCreateStreamQueries();
}
Expand Down
Loading